[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup
[ https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398124#comment-16398124 ] ASF GitHub Bot commented on KAFKA-3978: --- hachikuji closed pull request #4695: KAFKA-3978; highwatermark should always be positive URL: https://github.com/apache/kafka/pull/4695 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3b97671524d..68faf00c079 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -460,7 +460,11 @@ class Partition(val topic: String, }.map(_.logEndOffset) val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering) val oldHighWatermark = leaderReplica.highWatermark -if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) { + +// Ensure that the high watermark increases monotonically. We also update the high watermark when the new +// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. +if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || + (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { leaderReplica.highWatermark = newHighWatermark debug(s"High watermark updated to $newHighWatermark") true diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index e41e389e22d..030e5b7eb58 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -138,6 +138,9 @@ class Replica(val brokerId: Int, def highWatermark_=(newHighWatermark: LogOffsetMetadata) { if (isLocal) { + if (newHighWatermark.messageOffset < 0) +throw new IllegalArgumentException("High watermark offset should be non-negative") + highWatermarkMetadata = newHighWatermark log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset)) trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]") @@ -165,9 +168,16 @@ class Replica(val brokerId: Int, s"non-local replica $brokerId")) } - def convertHWToLocalOffsetMetadata() = { + /* + * Convert hw to local offset metadata by reading the log at the hw offset. + * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata. + */ + def convertHWToLocalOffsetMetadata() { if (isLocal) { - highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset) + highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse { +val firstOffset = log.get.logSegments.head.baseOffset +new LogOffsetMetadata(firstOffset, firstOffset, 0) + } } else { throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId") } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 257dd8f9ba4..f0050f54aef 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File, /** * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, return unknown offset metadata + * If the message offset is out of range, return None to the caller. */ - def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { + def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = { try { val fetchDataInfo = readUncommitted(offset, 1) - fetchDataInfo.fetchOffsetMetadata + Some(fetchDataInfo.fetchOffsetMetadata) } catch { - case _: OffsetOutOfRangeException => LogOffsetMetadata.UnknownOffsetMetadata + case _: OffsetOutOfRangeException => None } } diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index 2a24a37f151..0c41519d211 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -76,6 +76,32 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging {
[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean
[ https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398119#comment-16398119 ] Matthias J. Sax commented on KAFKA-6647: [~gbloggs] Thanks for reporting the issue. What I am wondering is, why there is a .lock file in the task directory in the first place. On a clean shutdown, all lock files should be releases. Thus, an existing .lock file indicates, that some thread is actually using the task directory and thus, it should not be deleted (ie, it's expected that cleanup() fails for this case). Also note, a thread can delete a directory even if the directory contains it's own lock file (because the threads owns this lock). Does this make sense? > KafkaStreams.cleanUp creates .lock file in directory its trying to clean > > > Key: KAFKA-6647 > URL: https://issues.apache.org/jira/browse/KAFKA-6647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 > Environment: windows 10. > java version "1.8.0_162" > Java(TM) SE Runtime Environment (build 1.8.0_162-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) > org.apache.kafka:kafka-streams:1.0.1 > Kafka commitId : c0518aa65f25317e >Reporter: George Bloggs >Priority: Minor > Labels: streams > > When calling kafkaStreams.cleanUp() before starting a stream the > StateDirectory.cleanRemovedTasks() method contains this check: > {code:java} > ... Line 240 > if (lock(id, 0)) { > long now = time.milliseconds(); > long lastModifiedMs = taskDir.lastModified(); > if (now > lastModifiedMs + cleanupDelayMs) { > log.info("{} Deleting obsolete state directory {} > for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), > dirName, id, now - lastModifiedMs, cleanupDelayMs); > Utils.delete(taskDir); > } > } > {code} > The check for lock(id,0) will create a .lock file in the directory that > subsequently is going to be deleted. If the .lock file already exists from a > previous run the attempt to delete the .lock file fails with > AccessDeniedException. > This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will > then attempt to remove the taskDir path calling Files.delete(path). > The call to files.delete(path) in postVisitDirectory will then fail > java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the > .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory > : stream-thread [restartedMain] Failed to lock the state directory due > to an unexpected exception) > This seems to then cause issues using streams from a topic to an inMemory > store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6646) Add a GlobalKStream object type for stream event broadcast
[ https://issues.apache.org/jira/browse/KAFKA-6646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6646: --- Labels: needs-kip (was: ) > Add a GlobalKStream object type for stream event broadcast > -- > > Key: KAFKA-6646 > URL: https://issues.apache.org/jira/browse/KAFKA-6646 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Major > Labels: needs-kip > > There are some use cases where having a global KStream object is useful. For > example, where a single event is sent, with very low frequency, to a cluster > of Kafka stream nodes to trigger all nodes to do some processing of state > stored on their instance. > Workaround currently is to either create a second kstream app instance, being > careful to configure it with a different state dir, and give it a unique app > name per instance, then create a kstream in each one. Or - you can use the > normal consumer client inside your kstream app with unique consumer groups. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6645) Host Affinity to facilitate faster restarts of kafka streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398118#comment-16398118 ] Matthias J. Sax commented on KAFKA-6645: This should be supported already. On startup, Kafka Streams inspects it's local state directory and adds available stores into rebalance metadata (ie, prev-assigned standby tasks). This allows to reassign partitions accordingly to avoid state migration. Note, that upcoming 1.1 release contains some improvements to partition assignment. This Jira might still be valid in order to improve the existing strategy further. It would be great if you could try out 1.0 or better 1.1 release and check if Kafka Streams behaves as expected. If not, it would be great to learn what is missing in detail so we can close those gaps. Thanks a lot! > Host Affinity to facilitate faster restarts of kafka streams applications > - > > Key: KAFKA-6645 > URL: https://issues.apache.org/jira/browse/KAFKA-6645 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Giridhar Addepalli >Priority: Major > > Since Kafka Streams applications have lot of state in the stores in general, > it would be good to remember the assignment of partitions to machines. So > that when whole application is restarted for some reason, there is a way to > use past assignment of partitions to machines and there won't be need to > build up whole state by reading off of changelog kafka topic. This would > result in faster start-up. > Samza has support for Host Affinity > ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html]) > KIP-54 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] > , handles cases where some members of consumer group goes down / comes up, > and KIP-54 ensures there is minimal diff between assignments before and after > rebalance. > But to handle whole restart use case, we need to remember past assignment > somewhere, and use it after restart. > Please let us know if this is already solved problem / some cleaner way of > achieving this objective -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6645) Host Affinity to facilitate faster restarts of kafka streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Giridhar Addepalli updated KAFKA-6645: -- Description: Since Kafka Streams applications have lot of state in the stores in general, it would be good to remember the assignment of partitions to machines. So that when whole application is restarted for some reason, there is a way to use past assignment of partitions to machines and there won't be need to build up whole state by reading off of changelog kafka topic. This would result in faster start-up. Samza has support for Host Affinity ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html]) KIP-54 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] , handles cases where some members of consumer group goes down / comes up, and KIP-54 ensures there is minimal diff between assignments before and after rebalance. But to handle whole restart use case, we need to remember past assignment somewhere, and use it after restart. Please let us know if this is already solved problem / some cleaner way of achieving this objective was: Since Kafka Streams applications have lot of state in the stores in general, it would be good to remember the assignment of partitions to machines. So that when whole application is restarted for whatever reason, there is a way to use past assignment of partitions to machines and there won't be need to build up state by reading off of changelog kafka topic and would result in faster start-up. Samza has support for Host Affinity ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html]) KIP-54 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] , handles cases where some members of consumer group goes down / comes up, and KIP-54 ensures there is minimal diff between assignments before and after rebalance. But to handle whole restart use case, we need to remember past assignment somewhere, and use it after restart. Please let us know if this is already solved problem / some cleaner way of achieving this objective > Host Affinity to facilitate faster restarts of kafka streams applications > - > > Key: KAFKA-6645 > URL: https://issues.apache.org/jira/browse/KAFKA-6645 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Giridhar Addepalli >Priority: Major > > Since Kafka Streams applications have lot of state in the stores in general, > it would be good to remember the assignment of partitions to machines. So > that when whole application is restarted for some reason, there is a way to > use past assignment of partitions to machines and there won't be need to > build up whole state by reading off of changelog kafka topic. This would > result in faster start-up. > Samza has support for Host Affinity > ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html]) > KIP-54 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] > , handles cases where some members of consumer group goes down / comes up, > and KIP-54 ensures there is minimal diff between assignments before and after > rebalance. > But to handle whole restart use case, we need to remember past assignment > somewhere, and use it after restart. > Please let us know if this is already solved problem / some cleaner way of > achieving this objective -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398074#comment-16398074 ] ASF GitHub Bot commented on KAFKA-6649: --- huxihx opened a new pull request #4707: KAFKA-6649: Should catch OutOfRangeException for ReplicaFetcherThread URL: https://github.com/apache/kafka/pull/4707 https://issues.apache.org/jira/browse/KAFKA-6649 `AbstractFetcherThread.processFetchRequest` should catch OffsetOutOfRangeException lest the thread was forcibly stopped. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior >
[jira] [Commented] (KAFKA-6500) Can not build aggregatedJavadoc
[ https://issues.apache.org/jira/browse/KAFKA-6500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398033#comment-16398033 ] Jimin Hsieh commented on KAFKA-6500: Could you specify more detail? Which version or commit of Kafka you try to build? I tried {code:bash} ./gradlew aggregatedJavadoc {code} with openjdk 7 & 8 and gradle 4.6 without any issues. > Can not build aggregatedJavadoc > --- > > Key: KAFKA-6500 > URL: https://issues.apache.org/jira/browse/KAFKA-6500 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Pavel Timofeev >Priority: Minor > > I'm tying to build kafka according to instruction provided on github - > https://github.com/apache/kafka > I followed the instruction and managed to build a jar with > {code} > ./gradlew jar > {code} > But I'm unable to build aggregated javadoc with > {code} > ./gradlew aggregatedJavadoc > {code} > Neither with openjdk 1.7 nor 1.8. > It tells me > {noformat} > FAILURE: Build failed with an exception. > * What went wrong: > A problem was found with the configuration of task ':aggregatedJavadoc'. > > No value has been specified for property 'outputDirectory'. > {noformat} > Is the instruction missing some steps? > Is it a bug in build process? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6651) SchemaBuilder should not allow Arrays or Maps to be created by type()
[ https://issues.apache.org/jira/browse/KAFKA-6651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397962#comment-16397962 ] huxihx commented on KAFKA-6651: --- Instead of throwing exceptions, we could possibly add a couple of `setters` for `keySchema` and `valueSchema`. > SchemaBuilder should not allow Arrays or Maps to be created by type() > - > > Key: KAFKA-6651 > URL: https://issues.apache.org/jira/browse/KAFKA-6651 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Jeremy Custenborder >Priority: Minor > > The following code should throw an exception because we cannot set > valueSchema() or keySchema() once the builder is returned. > {code:java} > SchemaBuilder.type(Schema.Type.ARRAY); > SchemaBuilder.type(Schema.Type.MAP);{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state
Lucas Wang created KAFKA-6652: - Summary: The controller should log failed attempts to transition a replica to OfflineReplica state Key: KAFKA-6652 URL: https://issues.apache.org/jira/browse/KAFKA-6652 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Assignee: Lucas Wang In certain conditions, the controller's attempt to transition a replica to OfflineReplica state could fail, e.g. the condition described in [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that happens, there should be logs to indicate the failed state transitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397789#comment-16397789 ] ASF GitHub Bot commented on KAFKA-6052: --- vahidhashemian opened a new pull request #4705: KAFKA-6052: Fix the request retry issue (on Windows) in InterBrokerSendThread URL: https://github.com/apache/kafka/pull/4705 This resolves the issue detected on Windows. It's a follow-up to the investigation done by @hachikuji (details on the JIRA). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran >Priority: Major > Labels: windows > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producerproducer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting
[jira] [Created] (KAFKA-6651) SchemaBuilder should not allow Arrays or Maps to be created by type()
Jeremy Custenborder created KAFKA-6651: -- Summary: SchemaBuilder should not allow Arrays or Maps to be created by type() Key: KAFKA-6651 URL: https://issues.apache.org/jira/browse/KAFKA-6651 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Jeremy Custenborder The following code should throw an exception because we cannot set valueSchema() or keySchema() once the builder is returned. {code:java} SchemaBuilder.type(Schema.Type.ARRAY); SchemaBuilder.type(Schema.Type.MAP);{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6650) The controller should be able to handle a partially deleted topic
Lucas Wang created KAFKA-6650: - Summary: The controller should be able to handle a partially deleted topic Key: KAFKA-6650 URL: https://issues.apache.org/jira/browse/KAFKA-6650 Project: Kafka Issue Type: Bug Reporter: Lucas Wang Assignee: Lucas Wang A previous controller could have deleted some partitions of a topic from ZK, but not all partitions, and then died. In that case, the new controller should be able to handle the partially deleted topic, and finish the deletion. In the current code base, if there is no leadership info for a replica's partition, the transition to OfflineReplica state for the replica will fail. Afterwards the transition to ReplicaDeletionStarted will fail as well since the only valid previous state for ReplicaDeletionStarted is OfflineReplica. Furthermore, it means the topic deletion will never finish. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julio Ng updated KAFKA-6649: Affects Version/s: 1.0.0 0.11.0.2 1.0.1 > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
Julio Ng created KAFKA-6649: --- Summary: ReplicaFetcher stopped after non fatal exception is thrown Key: KAFKA-6649 URL: https://issues.apache.org/jira/browse/KAFKA-6649 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 1.1.0 Reporter: Julio Ng We have seen several under-replication partitions, usually triggered by topic creation. After digging in the logs, we see the below: {noformat} [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: Error processing data for partition [[TOPIC_NAME_REMOVED]]-84 offset 2098535 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 2098535 of partition [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} It looks like that after the ReplicaFetcherThread is stopped, the replicas start to lag behind, presumably because we are not fetching from the leader anymore. Further examining, the ShutdownableThread.scala object: {noformat} override def run(): Unit = { info("Starting") try { while (isRunning) doWork() } catch { case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally { shutdownComplete.countDown() } info("Stopped") }{noformat} For the Throwable (non-fatal) case, it just exits the while loop and the thread stops doing work. I am not sure whether this is the intended behavior of the ShutdownableThread, or the exception should be caught and we should keep calling doWork() -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean
[ https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397437#comment-16397437 ] George Bloggs commented on KAFKA-6647: -- Hi. It does remove the lock file in the finally block but this is too late as the attempt to delete the directory has failed with the exception as the lock file exists in the directory it’s trying to delete. > KafkaStreams.cleanUp creates .lock file in directory its trying to clean > > > Key: KAFKA-6647 > URL: https://issues.apache.org/jira/browse/KAFKA-6647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 > Environment: windows 10. > java version "1.8.0_162" > Java(TM) SE Runtime Environment (build 1.8.0_162-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) > org.apache.kafka:kafka-streams:1.0.1 > Kafka commitId : c0518aa65f25317e >Reporter: George Bloggs >Priority: Minor > Labels: streams > > When calling kafkaStreams.cleanUp() before starting a stream the > StateDirectory.cleanRemovedTasks() method contains this check: > {code:java} > ... Line 240 > if (lock(id, 0)) { > long now = time.milliseconds(); > long lastModifiedMs = taskDir.lastModified(); > if (now > lastModifiedMs + cleanupDelayMs) { > log.info("{} Deleting obsolete state directory {} > for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), > dirName, id, now - lastModifiedMs, cleanupDelayMs); > Utils.delete(taskDir); > } > } > {code} > The check for lock(id,0) will create a .lock file in the directory that > subsequently is going to be deleted. If the .lock file already exists from a > previous run the attempt to delete the .lock file fails with > AccessDeniedException. > This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will > then attempt to remove the taskDir path calling Files.delete(path). > The call to files.delete(path) in postVisitDirectory will then fail > java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the > .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory > : stream-thread [restartedMain] Failed to lock the state directory due > to an unexpected exception) > This seems to then cause issues using streams from a topic to an inMemory > store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean
[ https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397429#comment-16397429 ] ASF GitHub Bot commented on KAFKA-6647: --- tedyu opened a new pull request #4702: KAFKA-6647 KafkaStreams.cleanUp creates .lock file in directory it tries to clean URL: https://github.com/apache/kafka/pull/4702 Specify StandardOpenOption#DELETE_ON_CLOSE when creating the FileChannel. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaStreams.cleanUp creates .lock file in directory its trying to clean > > > Key: KAFKA-6647 > URL: https://issues.apache.org/jira/browse/KAFKA-6647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 > Environment: windows 10. > java version "1.8.0_162" > Java(TM) SE Runtime Environment (build 1.8.0_162-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) > org.apache.kafka:kafka-streams:1.0.1 > Kafka commitId : c0518aa65f25317e >Reporter: George Bloggs >Priority: Minor > Labels: streams > > When calling kafkaStreams.cleanUp() before starting a stream the > StateDirectory.cleanRemovedTasks() method contains this check: > {code:java} > ... Line 240 > if (lock(id, 0)) { > long now = time.milliseconds(); > long lastModifiedMs = taskDir.lastModified(); > if (now > lastModifiedMs + cleanupDelayMs) { > log.info("{} Deleting obsolete state directory {} > for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), > dirName, id, now - lastModifiedMs, cleanupDelayMs); > Utils.delete(taskDir); > } > } > {code} > The check for lock(id,0) will create a .lock file in the directory that > subsequently is going to be deleted. If the .lock file already exists from a > previous run the attempt to delete the .lock file fails with > AccessDeniedException. > This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will > then attempt to remove the taskDir path calling Files.delete(path). > The call to files.delete(path) in postVisitDirectory will then fail > java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the > .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory > : stream-thread [restartedMain] Failed to lock the state directory due > to an unexpected exception) > This seems to then cause issues using streams from a topic to an inMemory > store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5697: --- Description: In {{StreamThread.shutdown()}} we currently do nothing but set the state, hoping the stream thread may eventually check it and shutdown itself. However, under certain scenarios the thread may get blocked within a single loop and hence will never check on this state enum. For example, it's {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block until the coordinator can be found. If the coordinator broker is never up and running then the Stream instance will be blocked forever. A simple way to produce this issue is to start the work count demo without starting the ZK / Kafka broker, and then it will get stuck in a single loop and even `ctrl-C` will not stop it since its set state will never be read by the thread: {code:java} [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) {code} was: In {{StreamThread.close()}} we currently do nothing but set the state, hoping the stream thread may eventually check it and shutdown itself. However, under certain scenarios the thread may get blocked within a single loop and hence will never check on this state enum. For example, it's {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block until the coordinator can be found. If the coordinator broker is never up and running then the Stream instance will be blocked forever. A simple way to produce this issue is to start the work count demo without starting the ZK / Kafka broker, and then it will get stuck in a single loop and even `ctrl-C` will not stop it since its set state will never be read by the thread: {code:java} [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) {code} > StreamThread.shutdown() need to interrupt the stream threads to break the loop > -- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: John Roesler >Priority: Major > Labels: newbie > > In {{StreamThread.shutdown()}} we currently do nothing but set the state, > hoping the stream thread may eventually check it and shutdown itself. > However, under certain scenarios the thread may get blocked within a single > loop and hence will never check on this state enum. For example, it's > {{consumer.poll}} call
[jira] [Assigned] (KAFKA-6376) Improve Streams metrics for skipped records
[ https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-6376: --- Assignee: John Roesler > Improve Streams metrics for skipped records > --- > > Key: KAFKA-6376 > URL: https://issues.apache.org/jira/browse/KAFKA-6376 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: John Roesler >Priority: Major > Labels: needs-kip > > Copy this from KIP-210 discussion thread: > {quote} > Note that currently we have two metrics for `skipped-records` on different > levels: > 1) on the highest level, the thread-level, we have a `skipped-records`, > that records all the skipped records due to deserialization errors. > 2) on the lower processor-node level, we have a > `skippedDueToDeserializationError`, that records the skipped records on > that specific source node due to deserialization errors. > So you can see that 1) does not cover any other scenarios and can just be > thought of as an aggregate of 2) across all the tasks' source nodes. > However, there are other places that can cause a record to be dropped, for > example: > 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be > dropped due to window elapsed. > 2) KIP-210: records could be dropped on the producer side. > 3) records could be dropped during user-customized processing on errors. > {quote} > [~guozhang] Not sure what you mean by "3) records could be dropped during > user-customized processing on errors." > Btw: we also drop record with {{null}} key and/or value for certain DSL > operations. This should be included as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-5697: --- Summary: StreamThread.shutdown() need to interrupt the stream threads to break the loop (was: StreamThread.close() need to interrupt the stream threads to break the loop) > StreamThread.shutdown() need to interrupt the stream threads to break the loop > -- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: John Roesler >Priority: Major > Labels: newbie > > In {{StreamThread.close()}} we currently do nothing but set the state, hoping > the stream thread may eventually check it and shutdown itself. However, under > certain scenarios the thread may get blocked within a single loop and hence > will never check on this state enum. For example, it's {{consumer.poll}} call > trigger {{ensureCoordinatorReady()}} which will block until the coordinator > can be found. If the coordinator broker is never up and running then the > Stream instance will be blocked forever. > A simple way to produce this issue is to start the work count demo without > starting the ZK / Kafka broker, and then it will get stuck in a single loop > and even `ctrl-C` will not stop it since its set state will never be read by > the thread: > {code:java} > [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-5697) StreamThread.close() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-5697: --- Assignee: John Roesler > StreamThread.close() need to interrupt the stream threads to break the loop > --- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: John Roesler >Priority: Major > Labels: newbie > > In {{StreamThread.close()}} we currently do nothing but set the state, hoping > the stream thread may eventually check it and shutdown itself. However, under > certain scenarios the thread may get blocked within a single loop and hence > will never check on this state enum. For example, it's {{consumer.poll}} call > trigger {{ensureCoordinatorReady()}} which will block until the coordinator > can be found. If the coordinator broker is never up and running then the > Stream instance will be blocked forever. > A simple way to produce this issue is to start the work count demo without > starting the ZK / Kafka broker, and then it will get stuck in a single loop > and even `ctrl-C` will not stop it since its set state will never be read by > the thread: > {code:java} > [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-4730) Streams does not have an in-memory windowed store
[ https://issues.apache.org/jira/browse/KAFKA-4730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-4730: --- Assignee: John Roesler (was: Nikki Thean) > Streams does not have an in-memory windowed store > - > > Key: KAFKA-4730 > URL: https://issues.apache.org/jira/browse/KAFKA-4730 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Eno Thereska >Assignee: John Roesler >Priority: Major > > Streams has windowed persistent stores (e.g., see PersistentKeyValueFactory > interface with "windowed" method), however it does not allow for windowed > in-memory stores (e.g., see InMemoryKeyValueFactory interface). > In addition to the interface not allowing it, streams does not actually have > an implementation of an in-memory windowed store. > The implications are that operations that require windowing cannot use > in-memory stores. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean
[ https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] George Bloggs updated KAFKA-6647: - Environment: windows 10. java version "1.8.0_162" Java(TM) SE Runtime Environment (build 1.8.0_162-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) org.apache.kafka:kafka-streams:1.0.1 Kafka commitId : c0518aa65f25317e was: windows 10. java version "1.8.0_162" Java(TM) SE Runtime Environment (build 1.8.0_162-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) org.apache.kafka:kafka-streams:1.0.1 > KafkaStreams.cleanUp creates .lock file in directory its trying to clean > > > Key: KAFKA-6647 > URL: https://issues.apache.org/jira/browse/KAFKA-6647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 > Environment: windows 10. > java version "1.8.0_162" > Java(TM) SE Runtime Environment (build 1.8.0_162-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) > org.apache.kafka:kafka-streams:1.0.1 > Kafka commitId : c0518aa65f25317e >Reporter: George Bloggs >Priority: Minor > Labels: streams > > When calling kafkaStreams.cleanUp() before starting a stream the > StateDirectory.cleanRemovedTasks() method contains this check: > {code:java} > ... Line 240 > if (lock(id, 0)) { > long now = time.milliseconds(); > long lastModifiedMs = taskDir.lastModified(); > if (now > lastModifiedMs + cleanupDelayMs) { > log.info("{} Deleting obsolete state directory {} > for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), > dirName, id, now - lastModifiedMs, cleanupDelayMs); > Utils.delete(taskDir); > } > } > {code} > The check for lock(id,0) will create a .lock file in the directory that > subsequently is going to be deleted. If the .lock file already exists from a > previous run the attempt to delete the .lock file fails with > AccessDeniedException. > This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will > then attempt to remove the taskDir path calling Files.delete(path). > The call to files.delete(path) in postVisitDirectory will then fail > java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the > .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory > : stream-thread [restartedMain] Failed to lock the state directory due > to an unexpected exception) > This seems to then cause issues using streams from a topic to an inMemory > store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean
[ https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] George Bloggs updated KAFKA-6647: - Description: When calling kafkaStreams.cleanUp() before starting a stream the StateDirectory.cleanRemovedTasks() method contains this check: {code:java} ... Line 240 if (lock(id, 0)) { long now = time.milliseconds(); long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs) { log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); Utils.delete(taskDir); } } {code} The check for lock(id,0) will create a .lock file in the directory that subsequently is going to be deleted. If the .lock file already exists from a previous run the attempt to delete the .lock file fails with AccessDeniedException. This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will then attempt to remove the taskDir path calling Files.delete(path). The call to files.delete(path) in postVisitDirectory will then fail java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory : stream-thread [restartedMain] Failed to lock the state directory due to an unexpected exception) This seems to then cause issues using streams from a topic to an inMemory store. was: When calling kafkaStreams.cleanUp() before starting a stream the StateDirectory.cleanRemovedTasks() method contains this check: {code:java} ... Line 240 if (lock(id, 0)) { long now = time.milliseconds(); long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs) { log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); Utils.delete(taskDir); } } {code} The check for lock(id,0) will create a .lock file in the directory that subsequently is going to be deleted. If the .lock file already exists from a previous run the attempt to delete the .lock file fails with AccessDeniedException. This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will then attempt to remove the taskDir path calling Files.delete(path). The call to files.delete(path) in postVisitDirectory will then fail java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the .lock file left the directory not empty. This seems to then cause issues using streams from a topic to an in memory store. > KafkaStreams.cleanUp creates .lock file in directory its trying to clean > > > Key: KAFKA-6647 > URL: https://issues.apache.org/jira/browse/KAFKA-6647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 > Environment: windows 10. > java version "1.8.0_162" > Java(TM) SE Runtime Environment (build 1.8.0_162-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) > org.apache.kafka:kafka-streams:1.0.1 >Reporter: George Bloggs >Priority: Minor > Labels: streams > > When calling kafkaStreams.cleanUp() before starting a stream the > StateDirectory.cleanRemovedTasks() method contains this check: > {code:java} > ... Line 240 > if (lock(id, 0)) { > long now = time.milliseconds(); > long lastModifiedMs = taskDir.lastModified(); > if (now > lastModifiedMs + cleanupDelayMs) { > log.info("{} Deleting obsolete state directory {} > for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), > dirName, id, now - lastModifiedMs, cleanupDelayMs); > Utils.delete(taskDir); > } > } > {code} > The check for lock(id,0) will create a .lock file in the directory that > subsequently is going to be deleted. If the .lock file already exists from a > previous run the attempt to delete the .lock file fails with > AccessDeniedException. > This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will > then attempt to remove the taskDir path calling Files.delete(path). > The call to files.delete(path) in postVisitDirectory will then fail >
[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean
[ https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397366#comment-16397366 ] Ted Yu commented on KAFKA-6647: --- The above code is paired with: {code} } finally { try { unlock(id); {code} where unlock() has: {code} lockAndOwner.lock.release(); log.debug("{} Released state dir lock for task {}", logPrefix(), taskId); {code} Wouldn't the above get rid of the lock file ? > KafkaStreams.cleanUp creates .lock file in directory its trying to clean > > > Key: KAFKA-6647 > URL: https://issues.apache.org/jira/browse/KAFKA-6647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 > Environment: windows 10. > java version "1.8.0_162" > Java(TM) SE Runtime Environment (build 1.8.0_162-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) > org.apache.kafka:kafka-streams:1.0.1 >Reporter: George Bloggs >Priority: Minor > Labels: streams > > When calling kafkaStreams.cleanUp() before starting a stream the > StateDirectory.cleanRemovedTasks() method contains this check: > {code:java} > ... Line 240 > if (lock(id, 0)) { > long now = time.milliseconds(); > long lastModifiedMs = taskDir.lastModified(); > if (now > lastModifiedMs + cleanupDelayMs) { > log.info("{} Deleting obsolete state directory {} > for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), > dirName, id, now - lastModifiedMs, cleanupDelayMs); > Utils.delete(taskDir); > } > } > {code} > The check for lock(id,0) will create a .lock file in the directory that > subsequently is going to be deleted. If the .lock file already exists from a > previous run the attempt to delete the .lock file fails with > AccessDeniedException. > This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will > then attempt to remove the taskDir path calling Files.delete(path). > The call to files.delete(path) in postVisitDirectory will then fail > java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the > .lock file left the directory not empty. > This seems to then cause issues using streams from a topic to an in memory > store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6644) Make Server Info more generic for Kafka Interactive Queries
[ https://issues.apache.org/jira/browse/KAFKA-6644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6644: --- Description: when working to implement *kafka streams interactive queries*, i see that i can set `application.server` with `host:port` *i would like a more generic mechanism to set additional properties.* i'm using cloud foundry containers for my kafka streams app. i scale out my containers using `*cf scale*`. each gets its own instance id. the *instance id* can be used in an http header to get cloud foundry to route the http to the correct instance [https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing] i realize, per Matthias J Sax, that Kafka Streams only distributes the information but does not use it. Thus, i can put the instance-id as the port. Changing the config or add a new one is a public API change are requires a KIP: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals was: when working to implement *kafka streams interactive queries*, i see that i can set `application.server` with `host:port` *i would like a more generic mechanism to set additional properties.* i'm using cloud foundry containers for my kafka streams app. i scale out my containers using `*cf scale*`. each gets its own instance id. the *instance id* can be used in an http header to get cloud foundry to route the http to the correct instance https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing i realize, per Matthias J Sax, that Kafka Streams only distributes the information but does not use it. Thus, i can put the instance-id as the port. > Make Server Info more generic for Kafka Interactive Queries > --- > > Key: KAFKA-6644 > URL: https://issues.apache.org/jira/browse/KAFKA-6644 > Project: Kafka > Issue Type: Improvement > Components: config, streams >Reporter: Mike Graham >Priority: Minor > Labels: needs-kip, newbie > > when working to implement *kafka streams interactive queries*, i see that i > can set `application.server` with `host:port` > *i would like a more generic mechanism to set additional properties.* > i'm using cloud foundry containers for my kafka streams app. i scale out my > containers using `*cf scale*`. each gets its own instance id. the *instance > id* can be used in an http header to get cloud foundry to route the http to > the correct instance > [https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing] > i realize, per Matthias J Sax, that Kafka Streams only distributes the > information but does not use it. Thus, i can put the instance-id as the port. > Changing the config or add a new one is a public API change are requires a > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6644) Make Server Info more generic for Kafka Interactive Queries
[ https://issues.apache.org/jira/browse/KAFKA-6644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6644: --- Labels: needs-kip newbie (was: ) > Make Server Info more generic for Kafka Interactive Queries > --- > > Key: KAFKA-6644 > URL: https://issues.apache.org/jira/browse/KAFKA-6644 > Project: Kafka > Issue Type: Improvement > Components: config, streams >Reporter: Mike Graham >Priority: Minor > Labels: needs-kip, newbie > > when working to implement *kafka streams interactive queries*, i see that i > can set `application.server` with `host:port` > *i would like a more generic mechanism to set additional properties.* > i'm using cloud foundry containers for my kafka streams app. i scale out my > containers using `*cf scale*`. each gets its own instance id. the *instance > id* can be used in an http header to get cloud foundry to route the http to > the correct instance > https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing > i realize, per Matthias J Sax, that Kafka Streams only distributes the > information but does not use it. Thus, i can put the instance-id as the port. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5697) StreamThread.close() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397347#comment-16397347 ] Ted Yu commented on KAFKA-5697: --- Looks like StreamThread.close() doesn't exist (after refactoring). > StreamThread.close() need to interrupt the stream threads to break the loop > --- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie > > In {{StreamThread.close()}} we currently do nothing but set the state, hoping > the stream thread may eventually check it and shutdown itself. However, under > certain scenarios the thread may get blocked within a single loop and hence > will never check on this state enum. For example, it's {{consumer.poll}} call > trigger {{ensureCoordinatorReady()}} which will block until the coordinator > can be found. If the coordinator broker is never up and running then the > Stream instance will be blocked forever. > A simple way to produce this issue is to start the work count demo without > starting the ZK / Kafka broker, and then it will get stuck in a single loop > and even `ctrl-C` will not stop it since its set state will never be read by > the thread: > {code:java} > [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5697) StreamThread.close() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-5697: - Labels: newbie (was: ) > StreamThread.close() need to interrupt the stream threads to break the loop > --- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie > > In {{StreamThread.close()}} we currently do nothing but set the state, hoping > the stream thread may eventually check it and shutdown itself. However, under > certain scenarios the thread may get blocked within a single loop and hence > will never check on this state enum. For example, it's {{consumer.poll}} call > trigger {{ensureCoordinatorReady()}} which will block until the coordinator > can be found. If the coordinator broker is never up and running then the > Stream instance will be blocked forever. > A simple way to produce this issue is to start the work count demo without > starting the ZK / Kafka broker, and then it will get stuck in a single loop > and even `ctrl-C` will not stop it since its set state will never be read by > the thread: > {code:java} > [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
[ https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397306#comment-16397306 ] radai rosenblatt commented on KAFKA-6648: - PR is https://github.com/apache/kafka/pull/4679 > Fetcher.getTopicMetadata() only returns "healthy" partitions, not all > - > > Key: KAFKA-6648 > URL: https://issues.apache.org/jira/browse/KAFKA-6648 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: radai rosenblatt >Assignee: radai rosenblatt >Priority: Major > Fix For: 2.0.0 > > > {code} > if (!shouldRetry) { >HashMaptopicsPartitionInfos = new > HashMap<>(); >for (String topic : cluster.topics()) > topicsPartitionInfos.put(topic, > cluster.availablePartitionsForTopic(topic)); >return topicsPartitionInfos; > } > {code} > this leads to inconsistent behavior upstream, for example in > KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions > would be returned, whereas if MD doesnt exist (or has expired) a subset of > partitions (only the healthy ones) would be returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
[ https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radai rosenblatt updated KAFKA-6648: Fix Version/s: 2.0.0 > Fetcher.getTopicMetadata() only returns "healthy" partitions, not all > - > > Key: KAFKA-6648 > URL: https://issues.apache.org/jira/browse/KAFKA-6648 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: radai rosenblatt >Assignee: radai rosenblatt >Priority: Major > Fix For: 2.0.0 > > > {code} > if (!shouldRetry) { >HashMaptopicsPartitionInfos = new > HashMap<>(); >for (String topic : cluster.topics()) > topicsPartitionInfos.put(topic, > cluster.availablePartitionsForTopic(topic)); >return topicsPartitionInfos; > } > {code} > this leads to inconsistent behavior upstream, for example in > KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions > would be returned, whereas if MD doesnt exist (or has expired) a subset of > partitions (only the healthy ones) would be returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
[ https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radai rosenblatt updated KAFKA-6648: Affects Version/s: (was: 1.0.1) 1.0.0 0.11.0.2 > Fetcher.getTopicMetadata() only returns "healthy" partitions, not all > - > > Key: KAFKA-6648 > URL: https://issues.apache.org/jira/browse/KAFKA-6648 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: radai rosenblatt >Assignee: radai rosenblatt >Priority: Major > Fix For: 2.0.0 > > > {code} > if (!shouldRetry) { >HashMaptopicsPartitionInfos = new > HashMap<>(); >for (String topic : cluster.topics()) > topicsPartitionInfos.put(topic, > cluster.availablePartitionsForTopic(topic)); >return topicsPartitionInfos; > } > {code} > this leads to inconsistent behavior upstream, for example in > KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions > would be returned, whereas if MD doesnt exist (or has expired) a subset of > partitions (only the healthy ones) would be returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
radai rosenblatt created KAFKA-6648: --- Summary: Fetcher.getTopicMetadata() only returns "healthy" partitions, not all Key: KAFKA-6648 URL: https://issues.apache.org/jira/browse/KAFKA-6648 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 1.0.1 Reporter: radai rosenblatt Assignee: radai rosenblatt {code} if (!shouldRetry) { HashMaptopicsPartitionInfos = new HashMap<>(); for (String topic : cluster.topics()) topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic)); return topicsPartitionInfos; } {code} this leads to inconsistent behavior upstream, for example in KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions would be returned, whereas if MD doesnt exist (or has expired) a subset of partitions (only the healthy ones) would be returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean
George Bloggs created KAFKA-6647: Summary: KafkaStreams.cleanUp creates .lock file in directory its trying to clean Key: KAFKA-6647 URL: https://issues.apache.org/jira/browse/KAFKA-6647 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.1 Environment: windows 10. java version "1.8.0_162" Java(TM) SE Runtime Environment (build 1.8.0_162-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode) org.apache.kafka:kafka-streams:1.0.1 Reporter: George Bloggs When calling kafkaStreams.cleanUp() before starting a stream the StateDirectory.cleanRemovedTasks() method contains this check: {code:java} ... Line 240 if (lock(id, 0)) { long now = time.milliseconds(); long lastModifiedMs = taskDir.lastModified(); if (now > lastModifiedMs + cleanupDelayMs) { log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); Utils.delete(taskDir); } } {code} The check for lock(id,0) will create a .lock file in the directory that subsequently is going to be deleted. If the .lock file already exists from a previous run the attempt to delete the .lock file fails with AccessDeniedException. This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will then attempt to remove the taskDir path calling Files.delete(path). The call to files.delete(path) in postVisitDirectory will then fail java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the .lock file left the directory not empty. This seems to then cause issues using streams from a topic to an in memory store. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6646) Add a GlobalKStream object type for stream event broadcast
Antony Stubbs created KAFKA-6646: Summary: Add a GlobalKStream object type for stream event broadcast Key: KAFKA-6646 URL: https://issues.apache.org/jira/browse/KAFKA-6646 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 1.1.0 Reporter: Antony Stubbs There are some use cases where having a global KStream object is useful. For example, where a single event is sent, with very low frequency, to a cluster of Kafka stream nodes to trigger all nodes to do some processing of state stored on their instance. Workaround currently is to either create a second kstream app instance, being careful to configure it with a different state dir, and give it a unique app name per instance, then create a kstream in each one. Or - you can use the normal consumer client inside your kstream app with unique consumer groups. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4027) Leader for a cetain partition unavailable forever
[ https://issues.apache.org/jira/browse/KAFKA-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397208#comment-16397208 ] Manikumar commented on KAFKA-4027: -- More details about similar exception are here: https://issues.apache.org/jira/browse/KAFKA-5758 > Leader for a cetain partition unavailable forever > - > > Key: KAFKA-4027 > URL: https://issues.apache.org/jira/browse/KAFKA-4027 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: tu nguyen khac >Priority: Major > > I have a cluster of brokers ( 9 box) , i 'm naming it from 0 --> 8 . > Yesterday some servers went down ( hard reset ) i regularly restart these > server ( down servers ) but after that some topics cannot assign leader > i checked server log and retrieved these logging : > kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower > 6's position -1 since the replica is not recognized to be one of the assigned > replicas 1 for partition [tos_htv3tv.com,31]. > at > kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:251) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:864) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:861) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at > kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:861) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:470) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:496) > at kafka.server.KafkaApis.handle(KafkaApis.scala:77) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > i tried to run Prefered Leader but it didn't work ( some partitions has node > leader ) :( -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6645) Host Affinity to facilitate faster restarts of kafka streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Giridhar Addepalli updated KAFKA-6645: -- Summary: Host Affinity to facilitate faster restarts of kafka streams applications (was: Sticky Partition Assignment to facilitate faster restarts of kafka streams applications) > Host Affinity to facilitate faster restarts of kafka streams applications > - > > Key: KAFKA-6645 > URL: https://issues.apache.org/jira/browse/KAFKA-6645 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Giridhar Addepalli >Priority: Major > > Since Kafka Streams applications have lot of state in the stores in general, > it would be good to remember the assignment of partitions to machines. So > that when whole application is restarted for whatever reason, there is a way > to use past assignment of partitions to machines and there won't be need to > build up state by reading off of changelog kafka topic and would result in > faster start-up. > Samza has support for Host Affinity > ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html]) > KIP-54 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] > , handles cases where some members of consumer group goes down / comes up, > and KIP-54 ensures there is minimal diff between assignments before and after > rebalance. > But to handle whole restart use case, we need to remember past assignment > somewhere, and use it after restart. > Please let us know if this is already solved problem / some cleaner way of > achieving this objective -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6645) Sticky Partition Assignment to facilitate faster restarts of kafka streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Giridhar Addepalli updated KAFKA-6645: -- Description: Since Kafka Streams applications have lot of state in the stores in general, it would be good to remember the assignment of partitions to machines. So that when whole application is restarted for whatever reason, there is a way to use past assignment of partitions to machines and there won't be need to build up state by reading off of changelog kafka topic and would result in faster start-up. Samza has support for Host Affinity ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html]) KIP-54 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] , handles cases where some members of consumer group goes down / comes up, and KIP-54 ensures there is minimal diff between assignments before and after rebalance. But to handle whole restart use case, we need to remember past assignment somewhere, and use it after restart. Please let us know if this is already solved problem / some cleaner way of achieving this objective was: Since Kafka Streams applications have lot of state in the stores in the general, it would be good to remember the assignment of partitions to machines. So that when whole application is restarted for whatever reason, there is a way to use past assignment of partitions to machines and there won't be need to build up state by reading off of changelog kafka topic and would result in faster start-up. Samza has support for Host Affinity (https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html) KIP-54 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] , handles cases where some members of consumer group goes down / comes up, and KIP-54 ensures there is minimal diff between assignments before and after rebalance. But to handle whole restart use case, we need to remember past assignment somewhere, and use it after restart. Please let us know if this is already solved problem / some cleaner way of achieving this objective > Sticky Partition Assignment to facilitate faster restarts of kafka streams > applications > --- > > Key: KAFKA-6645 > URL: https://issues.apache.org/jira/browse/KAFKA-6645 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Giridhar Addepalli >Priority: Major > > Since Kafka Streams applications have lot of state in the stores in general, > it would be good to remember the assignment of partitions to machines. So > that when whole application is restarted for whatever reason, there is a way > to use past assignment of partitions to machines and there won't be need to > build up state by reading off of changelog kafka topic and would result in > faster start-up. > Samza has support for Host Affinity > ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html]) > KIP-54 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] > , handles cases where some members of consumer group goes down / comes up, > and KIP-54 ensures there is minimal diff between assignments before and after > rebalance. > But to handle whole restart use case, we need to remember past assignment > somewhere, and use it after restart. > Please let us know if this is already solved problem / some cleaner way of > achieving this objective -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6634) Delay initiating the txn on producers until initializeTopology with EOS turned on
[ https://issues.apache.org/jira/browse/KAFKA-6634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6634. -- Resolution: Fixed Fix Version/s: 1.1.0 > Delay initiating the txn on producers until initializeTopology with EOS > turned on > - > > Key: KAFKA-6634 > URL: https://issues.apache.org/jira/browse/KAFKA-6634 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 1.1.0 > > > In Streams EOS implementation, the created producers for tasks will initiate > a txn immediately after being created in the constructor of `StreamTask`. > However, the task may not process any data and hence producer may not send > any records for that started txn for a long time because of the restoration > process. And with default txn.session.timeout valued at 60 seconds, it means > that if the restoration takes more than that amount of time, upon starting > the producer will immediately get the error that its producer epoch is > already old. > To fix this, we should consider instantiating the txn only after the > restoration phase is done. Although this may have a caveat that if the > producer is already fenced, it will not be notified until then, in > initializeTopology. But I think this should not be a correctness issue since > during the restoration process we do not make any changes to the processing > state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6634) Delay initiating the txn on producers until initializeTopology with EOS turned on
[ https://issues.apache.org/jira/browse/KAFKA-6634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397130#comment-16397130 ] ASF GitHub Bot commented on KAFKA-6634: --- guozhangwang closed pull request #4684: KAFKA-6634: Delay starting new transaction in task.initializeTopology URL: https://github.com/apache/kafka/pull/4684 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 8529c9eca88..c806bfde47e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -90,6 +90,7 @@ void addNewTask(final T task) { * @return partitions that are ready to be resumed * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition + * @throws TaskMigratedException if the task producer got fenced (EOS only) */ Set initializeNewTasks() { final Set readyPartitions = new HashSet<>(); @@ -240,18 +241,21 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set log.trace("found suspended {} {}", taskTypeName, taskId); if (task.partitions().equals(partitions)) { suspended.remove(taskId); +task.resume(); try { -task.resume(); +transitionToRunning(task, new HashSet()); } catch (final TaskMigratedException e) { +// we need to catch migration exception internally since this function +// is triggered in the rebalance callback log.info("Failed to resume {} {} since it got migrated to another thread already. " + "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id()); final RuntimeException fatalException = closeZombieTask(task); +running.remove(task.id()); if (fatalException != null) { throw fatalException; } throw e; } -transitionToRunning(task, new HashSet()); log.trace("resuming suspended {} {}", taskTypeName, task.id()); return true; } else { @@ -271,6 +275,9 @@ private void addToRestoring(final T task) { } } +/** + * @throws TaskMigratedException if the task producer got fenced (EOS only) + */ private void transitionToRunning(final T task, final Set readyPartitions) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index b8777ad5521..8d6e56a17aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -100,7 +100,6 @@ void removeAllSensors() { * @param cache the {@link ThreadCache} created by the thread * @param time the system {@link Time} of the thread * @param producer the instance of {@link Producer} used to produce records - * @throws TaskMigratedException if the task producer got fenced (EOS only) */ public StreamTask(final TaskId id, final Collection partitions, @@ -149,14 +148,11 @@ public StreamTask(final TaskId id, partitionGroup = new PartitionGroup(partitionQueues); stateMgr.registerGlobalStateStores(topology.globalStateStores()); + +// initialize transactions if eos is turned on, which will block if the previous transaction has not +// completed yet; do not start the first transaction until the topology has been initialized later if (eosEnabled) { -try { -this.producer.initTransactions(); -this.producer.beginTransaction(); -} catch (final ProducerFencedException fatal) { -throw new TaskMigratedException(this, fatal); -} -transactionInFlight = true; +this.producer.initTransactions(); } } @@
[jira] [Commented] (KAFKA-4027) Leader for a cetain partition unavailable forever
[ https://issues.apache.org/jira/browse/KAFKA-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397022#comment-16397022 ] Antonio Verardi commented on KAFKA-4027: [~tuyuri] I think I am having the exact same issue and on the exact same version. Did you ever figured out what caused it? > Leader for a cetain partition unavailable forever > - > > Key: KAFKA-4027 > URL: https://issues.apache.org/jira/browse/KAFKA-4027 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: tu nguyen khac >Priority: Major > > I have a cluster of brokers ( 9 box) , i 'm naming it from 0 --> 8 . > Yesterday some servers went down ( hard reset ) i regularly restart these > server ( down servers ) but after that some topics cannot assign leader > i checked server log and retrieved these logging : > kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower > 6's position -1 since the replica is not recognized to be one of the assigned > replicas 1 for partition [tos_htv3tv.com,31]. > at > kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:251) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:864) > at > kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:861) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > at > kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:861) > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:470) > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:496) > at kafka.server.KafkaApis.handle(KafkaApis.scala:77) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > i tried to run Prefered Leader but it didn't work ( some partitions has node > leader ) :( -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6645) Sticky Partition Assignment to facilitate faster restarts of kafka streams applications
[ https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Giridhar Addepalli updated KAFKA-6645: -- Summary: Sticky Partition Assignment to facilitate faster restarts of kafka streams applications (was: Sticky Partition Assignment across Kafka Streams application restarts) > Sticky Partition Assignment to facilitate faster restarts of kafka streams > applications > --- > > Key: KAFKA-6645 > URL: https://issues.apache.org/jira/browse/KAFKA-6645 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Giridhar Addepalli >Priority: Major > > Since Kafka Streams applications have lot of state in the stores in the > general, it would be good to remember the assignment of partitions to > machines. So that when whole application is restarted for whatever reason, > there is a way to use past assignment of partitions to machines and there > won't be need to build up state by reading off of changelog kafka topic and > would result in faster start-up. > Samza has support for Host Affinity > (https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html) > KIP-54 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] > , handles cases where some members of consumer group goes down / comes up, > and KIP-54 ensures there is minimal diff between assignments before and after > rebalance. > But to handle whole restart use case, we need to remember past assignment > somewhere, and use it after restart. > Please let us know if this is already solved problem / some cleaner way of > achieving this objective -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6645) Sticky Partition Assignment across Kafka Streams application restarts
[ https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Giridhar Addepalli updated KAFKA-6645: -- Issue Type: New Feature (was: Bug) > Sticky Partition Assignment across Kafka Streams application restarts > - > > Key: KAFKA-6645 > URL: https://issues.apache.org/jira/browse/KAFKA-6645 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Giridhar Addepalli >Priority: Major > > Since Kafka Streams applications have lot of state in the stores in the > general, it would be good to remember the assignment of partitions to > machines. So that when whole application is restarted for whatever reason, > there is a way to use past assignment of partitions to machines and there > won't be need to build up state by reading off of changelog kafka topic and > would result in faster start-up. > Samza has support for Host Affinity > (https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html) > KIP-54 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] > , handles cases where some members of consumer group goes down / comes up, > and KIP-54 ensures there is minimal diff between assignments before and after > rebalance. > But to handle whole restart use case, we need to remember past assignment > somewhere, and use it after restart. > Please let us know if this is already solved problem / some cleaner way of > achieving this objective -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6645) Sticky Partition Assignment across Kafka Streams application restarts
Giridhar Addepalli created KAFKA-6645: - Summary: Sticky Partition Assignment across Kafka Streams application restarts Key: KAFKA-6645 URL: https://issues.apache.org/jira/browse/KAFKA-6645 Project: Kafka Issue Type: Bug Components: streams Reporter: Giridhar Addepalli Since Kafka Streams applications have lot of state in the stores in the general, it would be good to remember the assignment of partitions to machines. So that when whole application is restarted for whatever reason, there is a way to use past assignment of partitions to machines and there won't be need to build up state by reading off of changelog kafka topic and would result in faster start-up. Samza has support for Host Affinity (https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html) KIP-54 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)] , handles cases where some members of consumer group goes down / comes up, and KIP-54 ensures there is minimal diff between assignments before and after rebalance. But to handle whole restart use case, we need to remember past assignment somewhere, and use it after restart. Please let us know if this is already solved problem / some cleaner way of achieving this objective -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Khaireddine Rezgui reassigned KAFKA-6535: - Assignee: Khaireddine Rezgui > Set default retention ms for Streams repartition topics to Long.MAX_VALUE > - > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Khaireddine Rezgui >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. > More specifically, in {{RepartitionTopicConfig}} we have a few default > overrides for repartition topic configs, we should just add the override for > {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still > allows users to override themselves if they want via > {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this > update takes effect. > In addition to the code change, we also need to have doc changes in > streams/upgrade_guide.html specifying this default value change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6576) Configurable Quota Management (KIP-257)
[ https://issues.apache.org/jira/browse/KAFKA-6576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396741#comment-16396741 ] ASF GitHub Bot commented on KAFKA-6576: --- rajinisivaram opened a new pull request #4699: KAFKA-6576: Configurable Quota Management (KIP-257) URL: https://github.com/apache/kafka/pull/4699 Enable quota calculation to be customized using a configurable callback. See KIP-257 for details. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Configurable Quota Management (KIP-257) > --- > > Key: KAFKA-6576 > URL: https://issues.apache.org/jira/browse/KAFKA-6576 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > > See > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management] > for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6637) if set topic config segment.ms=0 Kafka broker won't be able to start
[ https://issues.apache.org/jira/browse/KAFKA-6637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396711#comment-16396711 ] ASF GitHub Bot commented on KAFKA-6637: --- huxihx opened a new pull request #4698: KAFKA-6637: Avoid divide /zero error with segment.ms set to zero URL: https://github.com/apache/kafka/pull/4698 https://issues.apache.org/jira/browse/KAFKA-6637 Changes of this patch include: 1. Avoid divide /zero error with segment.ms set to zero 2. Ignore the effect of setting `log.roll.ms` to zero. Use `log.roll.hours` instead *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > if set topic config segment.ms=0 Kafka broker won't be able to start > > > Key: KAFKA-6637 > URL: https://issues.apache.org/jira/browse/KAFKA-6637 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Chong Wang >Assignee: huxihx >Priority: Major > > If set topic config segment.ms to 0, Kafka server won't be able to start > because of a FATAL error: > [2018-03-12 19:05:40,196] FATAL [KafkaServer id=2] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > java.lang.ArithmeticException: / by zero at > kafka.log.LogConfig.randomSegmentJitter(LogConfig.scala:100) at > kafka.log.Log.loadSegments(Log.scala:419) at > kafka.log.Log.(Log.scala:203) at kafka.log.Log$.apply(Log.scala:1734) > at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L100] > So the minimum value shouldn't be 0 > https://kafka.apache.org/documentation/#topicconfigs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6644) Make Server Info more generic for Kafka Interactive Queries
Mike Graham created KAFKA-6644: -- Summary: Make Server Info more generic for Kafka Interactive Queries Key: KAFKA-6644 URL: https://issues.apache.org/jira/browse/KAFKA-6644 Project: Kafka Issue Type: Improvement Components: config, streams Reporter: Mike Graham when working to implement *kafka streams interactive queries*, i see that i can set `application.server` with `host:port` *i would like a more generic mechanism to set additional properties.* i'm using cloud foundry containers for my kafka streams app. i scale out my containers using `*cf scale*`. each gets its own instance id. the *instance id* can be used in an http header to get cloud foundry to route the http to the correct instance https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing i realize, per Matthias J Sax, that Kafka Streams only distributes the information but does not use it. Thus, i can put the instance-id as the port. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6195) DNS alias support for secured connections
[ https://issues.apache.org/jira/browse/KAFKA-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396694#comment-16396694 ] Rajini Sivaram commented on KAFKA-6195: --- Looks good to me, I will respond on the discuss thread. > DNS alias support for secured connections > - > > Key: KAFKA-6195 > URL: https://issues.apache.org/jira/browse/KAFKA-6195 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Jonathan Skrzypek >Priority: Major > > It seems clients can't use a dns alias in front of a secured Kafka cluster. > So applications can only specify a list of hosts or IPs in bootstrap.servers > instead of an alias encompassing all cluster nodes. > Using an alias in bootstrap.servers results in the following error : > javax.security.sasl.SaslException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Fail to create credential. (63) - No service creds)]) > occurred when evaluating SASL token received from the Kafka Broker. Kafka > Client will go to AUTH_FAILED state. [Caused by > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Fail to create > credential. (63) - No service creds)]] > When using SASL/Kerberos authentication, the kafka server principal is of the > form kafka@kafka/broker1.hostname@example.com > Kerberos requires that the hosts can be resolved by their FQDNs. > During SASL handshake, the client will create a SASL token and then send it > to kafka for auth. > But to create a SASL token the client first needs to be able to validate that > the broker's kerberos is a valid one. > There are 3 potential options : > 1. Creating a single kerberos principal not linked to a host but to an alias > and reference it in the broker jaas file. > But I think the kerberos infrastructure would refuse to validate it, so the > SASL handshake would still fail > 2. Modify the client bootstrap mechanism to detect whether bootstrap.servers > contains a dns alias. If it does, resolve and expand the alias to retrieve > all hostnames behind it and add them to the list of nodes. > This could be done by modifying parseAndValidateAddresses() in ClientUtils > 3. Add a cluster.alias parameter that would be handled by the logic above. > Having another parameter to avoid confusion on how bootstrap.servers works > behind the scene. > Thoughts ? > I would be happy to contribute the change for any of the options. > I believe the ability to use a dns alias instead of static lists of brokers > would bring good deployment flexibility. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6637) if set topic config segment.ms=0 Kafka broker won't be able to start
[ https://issues.apache.org/jira/browse/KAFKA-6637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396622#comment-16396622 ] ASF GitHub Bot commented on KAFKA-6637: --- huxihx closed pull request #4697: KAFKA-6637: Avoid divide by zero exception when segment.ms is set to 0 URL: https://github.com/apache/kafka/pull/4697 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 30ca333dd28..2a941bc4196 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -98,7 +98,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp) def randomSegmentJitter: Long = -if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) +if (segmentJitterMs == 0 || segmentMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs) } object LogConfig { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index cf22305caf7..e35d188e943 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1088,7 +1088,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp) def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp) def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp) - def logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp)) + def logRollTimeMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeMillisProp)) match { + case Some(rollMs) if rollMs > 0 => rollMs + case _ => 60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp) + } def logRollTimeJitterMillis: java.lang.Long = Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp)) def logFlushIntervalMs: java.lang.Long = Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp)) def minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > if set topic config segment.ms=0 Kafka broker won't be able to start > > > Key: KAFKA-6637 > URL: https://issues.apache.org/jira/browse/KAFKA-6637 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Chong Wang >Assignee: huxihx >Priority: Major > > If set topic config segment.ms to 0, Kafka server won't be able to start > because of a FATAL error: > [2018-03-12 19:05:40,196] FATAL [KafkaServer id=2] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > java.lang.ArithmeticException: / by zero at > kafka.log.LogConfig.randomSegmentJitter(LogConfig.scala:100) at > kafka.log.Log.loadSegments(Log.scala:419) at > kafka.log.Log.(Log.scala:203) at kafka.log.Log$.apply(Log.scala:1734) > at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L100] > So the minimum value shouldn't be 0 > https://kafka.apache.org/documentation/#topicconfigs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6637) if set topic config segment.ms=0 Kafka broker won't be able to start
[ https://issues.apache.org/jira/browse/KAFKA-6637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx reassigned KAFKA-6637: - Assignee: huxihx > if set topic config segment.ms=0 Kafka broker won't be able to start > > > Key: KAFKA-6637 > URL: https://issues.apache.org/jira/browse/KAFKA-6637 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Chong Wang >Assignee: huxihx >Priority: Major > > If set topic config segment.ms to 0, Kafka server won't be able to start > because of a FATAL error: > [2018-03-12 19:05:40,196] FATAL [KafkaServer id=2] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > java.lang.ArithmeticException: / by zero at > kafka.log.LogConfig.randomSegmentJitter(LogConfig.scala:100) at > kafka.log.Log.loadSegments(Log.scala:419) at > kafka.log.Log.(Log.scala:203) at kafka.log.Log$.apply(Log.scala:1734) > at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at > java.util.concurrent.FutureTask.run(FutureTask.java:266) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L100] > So the minimum value shouldn't be 0 > https://kafka.apache.org/documentation/#topicconfigs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6617) Improve controller performance by batching reassignment znode write operation
[ https://issues.apache.org/jira/browse/KAFKA-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-6617: Summary: Improve controller performance by batching reassignment znode write operation (was: KAFKA-6617; Improve controller performance by batching reassignment znode write operation) > Improve controller performance by batching reassignment znode write operation > - > > Key: KAFKA-6617 > URL: https://issues.apache.org/jira/browse/KAFKA-6617 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6617) KAFKA-6617; Improve controller performance by batching reassignment znode write operation
[ https://issues.apache.org/jira/browse/KAFKA-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-6617: Summary: KAFKA-6617; Improve controller performance by batching reassignment znode write operation (was: Improve controller performance by batching reassignment znode write operation) > KAFKA-6617; Improve controller performance by batching reassignment znode > write operation > - > > Key: KAFKA-6617 > URL: https://issues.apache.org/jira/browse/KAFKA-6617 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration
[ https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396593#comment-16396593 ] Matthias J. Sax commented on KAFKA-6555: Thanks [~asurana]! Looking forward to your KIP! > Making state store queryable during restoration > --- > > Key: KAFKA-6555 > URL: https://issues.apache.org/jira/browse/KAFKA-6555 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ashish Surana >Assignee: Ashish Surana >Priority: Major > > State store in Kafka streams are currently only queryable when StreamTask is > in RUNNING state. The idea is to make it queryable even in the RESTORATION > (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and > making the data inaccessible during this time could be downtime not suitable > for many applications. > When the active partition goes down then one of the following occurs: > # One of the standby replica partition gets promoted to active: Replica task > has to restore the remaining state from the changelog topic before it can > become RUNNING. The time taken for this depends on how much the replica is > lagging behind. During this restoration time the state store for that > partition is currently not queryable resulting in the partition downtime. We > can make the state store partition queryable for the data already present in > the state store. > # When there is no replica or standby task, then active task will be started > in one of the existing node. That node has to build the entire state from the > changelog topic which can take lot of time depending on how big is the > changelog topic, and keeping state store not queryable during this time is > the downtime for the parition. > It's very important improvement as it could simply improve the availability > of microservices developed using kafka streams. > I am working on a patch for this change. Any feedback or comments are welcome. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6024) Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
[ https://issues.apache.org/jira/browse/KAFKA-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396592#comment-16396592 ] ASF GitHub Bot commented on KAFKA-6024: --- hachikuji closed pull request #4617: KAFKA-6024 Move validation in KafkaConsumer ahead of acquireAndEnsure… URL: https://github.com/apache/kafka/pull/4617 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3cd034eff76..81137f3c8dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -966,13 +966,12 @@ public void subscribe(Collection topics) { */ @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { +if (pattern == null) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); + acquireAndEnsureOpen(); try { -if (pattern == null) -throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null"); - throwIfNoAssignorsConfigured(); - log.debug("Subscribed to pattern: {}", pattern); this.subscriptions.subscribe(pattern, listener); this.metadata.needMetadataForAllTopics(true); @@ -1337,11 +1336,11 @@ public void commitAsync(final Mapoffsets, Of */ @Override public void seek(TopicPartition partition, long offset) { +if (offset < 0) +throw new IllegalArgumentException("seek offset must not be a negative number"); + acquireAndEnsureOpen(); try { -if (offset < 0) -throw new IllegalArgumentException("seek offset must not be a negative number"); - log.debug("Seeking to offset {} for partition {}", offset, partition); this.subscriptions.seek(partition, offset); } finally { @@ -1357,11 +1356,11 @@ public void seek(TopicPartition partition, long offset) { * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToBeginning(Collection partitions) { +if (partitions == null) +throw new IllegalArgumentException("Partitions collection cannot be null"); + acquireAndEnsureOpen(); try { -if (partitions == null) { -throw new IllegalArgumentException("Partitions collection cannot be null"); -} Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to beginning of partition {}", tp); @@ -1383,11 +1382,11 @@ public void seekToBeginning(Collection partitions) { * @throws IllegalArgumentException if {@code partitions} is {@code null} or the provided TopicPartition is not assigned to this consumer */ public void seekToEnd(Collection partitions) { +if (partitions == null) +throw new IllegalArgumentException("Partitions collection cannot be null"); + acquireAndEnsureOpen(); try { -if (partitions == null) { -throw new IllegalArgumentException("Partitions collection cannot be null"); -} Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions; for (TopicPartition tp : parts) { log.debug("Seeking to end of partition {}", tp); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consider moving validation in KafkaConsumer ahead of call to > acquireAndEnsureOpen() > --- > > Key: KAFKA-6024 > URL: https://issues.apache.org/jira/browse/KAFKA-6024 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > Fix For: 1.2.0 > > > In several methods, parameter validation is done after calling > acquireAndEnsureOpen() : > {code} >
[jira] [Resolved] (KAFKA-6024) Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
[ https://issues.apache.org/jira/browse/KAFKA-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6024. Resolution: Fixed Fix Version/s: 1.2.0 > Consider moving validation in KafkaConsumer ahead of call to > acquireAndEnsureOpen() > --- > > Key: KAFKA-6024 > URL: https://issues.apache.org/jira/browse/KAFKA-6024 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > Fix For: 1.2.0 > > > In several methods, parameter validation is done after calling > acquireAndEnsureOpen() : > {code} > public void seek(TopicPartition partition, long offset) { > acquireAndEnsureOpen(); > try { > if (offset < 0) > throw new IllegalArgumentException("seek offset must not be a > negative number"); > {code} > Since the value of parameter would not change per invocation, it seems > performing validation ahead of acquireAndEnsureOpen() call would be better. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time
[ https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Navinder Brar updated KAFKA-6643: - Description: In the current scenario, Kafka Streams has changelog Kafka topics(internal topics having all the data for the store) which are used to build the state of replicas. So, if we keep the number of standby replicas as 1, we still have more availability for persistent state stores as changelog Kafka topics are also replicated depending upon broker replication policy but that also means we are using at least 4 times the space(1 master store, 1 replica store, 1 changelog, 1 changelog replica). Now if we have an year's data in persistent stores(rocksdb), we don't want the changelog topics to have an year's data as it will put an unnecessary burden on brokers(in terms of space). If we have to scale our kafka streams application(having 200-300 TB's of data) we have to scale the kafka brokers as well. We want to reduce this dependency and find out ways to just use changelog topic as a queue, having just 2 or 3 days of data and warm up the replicas from scratch in some other way. I have few proposals in that respect. 1. Use a new kafka topic related to each partition which we need to warm up on the fly(when node containing that partition crashes. Produce into this topic from another replica/active and built new replica through this topic. 2. Use peer to peer file transfer(such as SFTP) as rocksdb can create backups, which can be transferred from source node to destination node when a new replica has to be built from scratch. 3. Use HDFS in intermediate instead of kafka topic where we can keep scheduled backups for each partition and use those to build new replicas. was: In the current scenario, Kafka Streams has changelog Kafka topics(internal topics having all the data for the store) which are used to build the state of replicas. So, if we keep the number of standby replicas as 1, we still have more availability for persistent state stores as changelog Kafka topics are also replicated depending upon broker replication policy but that also means we are using at least 4 times the space(1 master store, 1 replica store, 1 changelog, 1 changelog replica). Now if we have an year's data in persistent stores(rocksdb), we don't want the changelog topics to have an year's data as it will put an unnecessary burden on brokers(in terms of space). If we have to scale our kafka streams application(having 200-300 TB's of data) we have to scale the kafka brokers as well. We want to reduce this dependency and find out ways to just use changelog topic as a queue, having just 2 or 3 days of data and warm up the replicas from scratch in some other way. I have few proposals in that respect. 1. Use a new kafka topic related to each partition which we need to warm up on the fly(when node containing that partition crashes. Produce into this topic from another replica/active and built new replica through this topic. 2. Use peer to peer file transfer as rocksdb can create backups, which can be transferred from source node to destination node when a new replica has to be built from scratch. 3. Use HDFS in intermediate instead of kafka topic where we can keep scheduled backups for each partition and use those to build new replicas. > Warm up new replicas from scratch when changelog topic has LIMITED retention > time > - > > Key: KAFKA-6643 > URL: https://issues.apache.org/jira/browse/KAFKA-6643 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Navinder Brar >Priority: Major > > In the current scenario, Kafka Streams has changelog Kafka topics(internal > topics having all the data for the store) which are used to build the state > of replicas. So, if we keep the number of standby replicas as 1, we still > have more availability for persistent state stores as changelog Kafka topics > are also replicated depending upon broker replication policy but that also > means we are using at least 4 times the space(1 master store, 1 replica > store, 1 changelog, 1 changelog replica). > Now if we have an year's data in persistent stores(rocksdb), we don't want > the changelog topics to have an year's data as it will put an unnecessary > burden on brokers(in terms of space). If we have to scale our kafka streams > application(having 200-300 TB's of data) we have to scale the kafka brokers > as well. We want to reduce this dependency and find out ways to just use > changelog topic as a queue, having just 2 or 3 days of data and warm up the > replicas from scratch in some other way. > I have few proposals in that respect. > 1. Use a new kafka topic related to each partition which we need to warm up > on