[jira] [Updated] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning
[ https://issues.apache.org/jira/browse/KAFKA-6764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Larry McQueary updated KAFKA-6764: -- Summary: ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning (was: ConsoleConsumer behavior inconsistent when specifying --partition --from-beginning ) > ConsoleConsumer behavior inconsistent when specifying --partition with > --from-beginning > > > Key: KAFKA-6764 > URL: https://issues.apache.org/jira/browse/KAFKA-6764 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: Larry McQueary >Priority: Minor > Labels: newbie > > Per its usage statement, {{kafka-console-consumer.sh}} ignores > {{\-\-from-beginning}} when the specified consumer group has committed > offsets, and sets {{auto.offset.reset}} to {{latest}}. However, if > {{\-\-partition}} is also specified, {{\-\-from-beginning}} is observed in > all cases, whether there are committed offsets or not. > This happens because when {{\-\-from-beginning}} is specified, {{offsetArg}} > is set to {{OffsetRequest.EarliestTime}}. However, {{offsetArg}} is [only > passed to the > constructor|https://github.com/apache/kafka/blob/fedac0cea74fce529ee1c0cefd6af53ecbdd/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L76-L79] > for {{NewShinyConsumer}} when {{\-\-partition}} is also specified. > This case should either be handled consistently, or the usage statement > should be modified to indicate the actual behavior/usage. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition --from-beginning
Larry McQueary created KAFKA-6764: - Summary: ConsoleConsumer behavior inconsistent when specifying --partition --from-beginning Key: KAFKA-6764 URL: https://issues.apache.org/jira/browse/KAFKA-6764 Project: Kafka Issue Type: Bug Components: consumer Reporter: Larry McQueary Per its usage statement, {{kafka-console-consumer.sh}} ignores {{\-\-from-beginning}} when the specified consumer group has committed offsets, and sets {{auto.offset.reset}} to {{latest}}. However, if {{\-\-partition}} is also specified, {{\-\-from-beginning}} is observed in all cases, whether there are committed offsets or not. This happens because when {{\-\-from-beginning}} is specified, {{offsetArg}} is set to {{OffsetRequest.EarliestTime}}. However, {{offsetArg}} is [only passed to the constructor|https://github.com/apache/kafka/blob/fedac0cea74fce529ee1c0cefd6af53ecbdd/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L76-L79] for {{NewShinyConsumer}} when {{\-\-partition}} is also specified. This case should either be handled consistently, or the usage statement should be modified to indicate the actual behavior/usage. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2526) Console Producer / Consumer's serde config is not working
[ https://issues.apache.org/jira/browse/KAFKA-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429606#comment-16429606 ] huxihx commented on KAFKA-2526: --- [~mgharat] Do you still work on this jira? > Console Producer / Consumer's serde config is not working > - > > Key: KAFKA-2526 > URL: https://issues.apache.org/jira/browse/KAFKA-2526 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Mayuresh Gharat >Priority: Major > Labels: newbie > > Although in the console producer one can specify the key value serializer, > they are actually not used since 1) it always serialize the input string as > String.getBytes (hence always pre-assume the string serializer) and 2) it is > actually only passed into the old producer. The same issues exist in console > consumer. > In addition the configs in the console producer is messy: we have 1) some > config values exposed as cmd parameters, and 2) some config values in > --producer-property and 3) some in --property. > It will be great to clean the configs up in both console producer and > consumer, and put them into a single --property parameter which could > possibly take a file to reading in property values as well, and only leave > --new-producer as the other command line parameter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429570#comment-16429570 ] Matthias J. Sax commented on KAFKA-6742: [~vale68] I added you to the list of contributors and assigned this ticket to you. You can assign tickets to yourself now, too. > TopologyTestDriver error when dealing with stores from GlobalKTable > --- > > Key: KAFKA-6742 > URL: https://issues.apache.org/jira/browse/KAFKA-6742 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Valentino Proietti >Assignee: Valentino Proietti >Priority: Minor > > {color:#ff}This junit test simply fails:{color} > @Test > *public* *void* globalTable() { > StreamsBuilder builder = *new* StreamsBuilder(); > @SuppressWarnings("unused") > *final* KTablelocalTable = builder > .table("local", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("localStore")) > ; > @SuppressWarnings("unused") > *final* GlobalKTable globalTable = builder > .globalTable("global", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("globalStore")) > ; > // > Properties props = *new* Properties(); > props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); > props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); > TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), > props); > // > *final* KeyValueStore localStore = > testDriver.getKeyValueStore("localStore"); > Assert._assertNotNull_(localStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); > // > *final* KeyValueStore globalStore = > testDriver.getKeyValueStore("globalStore"); > Assert._assertNotNull_(globalStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); > // > *final* ConsumerRecordFactory crf = *new* > ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); > testDriver.pipeInput(crf.create("local", "one", "TheOne")); > testDriver.pipeInput(crf.create("global", "one", "TheOne")); > // > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("TheOne", globalStore.get("one")); > > > {color:#ff}to make it work I had to modify the TopologyTestDriver class > as follow:{color} > ... > *public* Map getAllStateStores() { > // final Map allStores = new HashMap<>(); > // for (final String storeName : > internalTopologyBuilder.allStateStoreName()) > { // allStores.put(storeName, ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(storeName)); // } > // return allStores; > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > *final* Map allStores = *new* HashMap<>(); > *for* (*final* String storeName : > internalTopologyBuilder.allStateStoreName()) { > StateStore res = psm.getStore(storeName); > if (res == null) > res = psm.getGlobalStore(storeName); > allStores.put(storeName, res); > } > *return* allStores; > } > ... > *public* StateStore getStateStore(*final* String name) { > // return ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(name); > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > StateStore res = psm.getStore(name); > *if* (res == *null*) > res = psm.getGlobalStore(name); > *return* res; > } > > {color:#ff}moreover I think it would be very useful to make the internal > MockProducer public for testing cases where a producer is used along side > with the "normal" stream processing by adding the method:{color} > /** > * *@return* records sent with this producer are automatically streamed > to the topology. > */ > *public* *final* Producer<*byte*[], *byte*[]> getProducer() { > return producer; > } > > {color:#ff}unfortunately this introduces another problem that could be > verified by adding the following lines to the previous junit test:{color} > ... > ** > // > ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); > // just to serialize keys and values > testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, > cr.timestamp(), cr.key(), cr.value())); >
[jira] [Assigned] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-6742: -- Assignee: Valentino Proietti > TopologyTestDriver error when dealing with stores from GlobalKTable > --- > > Key: KAFKA-6742 > URL: https://issues.apache.org/jira/browse/KAFKA-6742 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Valentino Proietti >Assignee: Valentino Proietti >Priority: Minor > > {color:#ff}This junit test simply fails:{color} > @Test > *public* *void* globalTable() { > StreamsBuilder builder = *new* StreamsBuilder(); > @SuppressWarnings("unused") > *final* KTablelocalTable = builder > .table("local", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("localStore")) > ; > @SuppressWarnings("unused") > *final* GlobalKTable globalTable = builder > .globalTable("global", > Consumed._with_(Serdes._String_(), Serdes._String_()), > Materialized._as_("globalStore")) > ; > // > Properties props = *new* Properties(); > props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); > props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); > TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), > props); > // > *final* KeyValueStore localStore = > testDriver.getKeyValueStore("localStore"); > Assert._assertNotNull_(localStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); > // > *final* KeyValueStore globalStore = > testDriver.getKeyValueStore("globalStore"); > Assert._assertNotNull_(globalStore); > Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); > // > *final* ConsumerRecordFactory crf = *new* > ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); > testDriver.pipeInput(crf.create("local", "one", "TheOne")); > testDriver.pipeInput(crf.create("global", "one", "TheOne")); > // > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("TheOne", globalStore.get("one")); > > > {color:#ff}to make it work I had to modify the TopologyTestDriver class > as follow:{color} > ... > *public* Map getAllStateStores() { > // final Map allStores = new HashMap<>(); > // for (final String storeName : > internalTopologyBuilder.allStateStoreName()) > { // allStores.put(storeName, ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(storeName)); // } > // return allStores; > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > *final* Map allStores = *new* HashMap<>(); > *for* (*final* String storeName : > internalTopologyBuilder.allStateStoreName()) { > StateStore res = psm.getStore(storeName); > if (res == null) > res = psm.getGlobalStore(storeName); > allStores.put(storeName, res); > } > *return* allStores; > } > ... > *public* StateStore getStateStore(*final* String name) { > // return ((ProcessorContextImpl) > task.context()).getStateMgr().getStore(name); > {color:#ff}// *FIXME*{color} > *final* ProcessorStateManager psm = ((ProcessorContextImpl) > task.context()).getStateMgr(); > StateStore res = psm.getStore(name); > *if* (res == *null*) > res = psm.getGlobalStore(name); > *return* res; > } > > {color:#ff}moreover I think it would be very useful to make the internal > MockProducer public for testing cases where a producer is used along side > with the "normal" stream processing by adding the method:{color} > /** > * *@return* records sent with this producer are automatically streamed > to the topology. > */ > *public* *final* Producer<*byte*[], *byte*[]> getProducer() { > return producer; > } > > {color:#ff}unfortunately this introduces another problem that could be > verified by adding the following lines to the previous junit test:{color} > ... > ** > // > ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); > // just to serialize keys and values > testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, > cr.timestamp(), cr.key(), cr.value())); > testDriver.advanceWallClockTime(0); > Assert._assertEquals_("TheOne", localStore.get("one")); > Assert._assertEquals_("Second",
[jira] [Created] (KAFKA-6763) Consider using direct byte buffers in SslTransportLayer
Ismael Juma created KAFKA-6763: -- Summary: Consider using direct byte buffers in SslTransportLayer Key: KAFKA-6763 URL: https://issues.apache.org/jira/browse/KAFKA-6763 Project: Kafka Issue Type: Improvement Reporter: Ismael Juma We use heap byte buffers in SslTransportLayer. For netReadBuffer and netWriteBuffer, it means that the NIO layer has to copy to/from a native buffer before it can write/read to the socket. It would be good to test if switching to direct byte buffers improves performance. We can't be sure as the benefit of avoiding the copy could be offset by the specifics of the operations we perform on netReadBuffer, netWriteBuffer and appReadBuffer. We should benchmark produce and consume performance and try a few combinations of direct/heap byte buffers for netReadBuffer, netWriteBuffer and appReadBuffer (the latter should probably remain as a heap byte buffer, but no harm in testing it too). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429509#comment-16429509 ] Matthias J. Sax commented on KAFKA-6535: [~Khairy] Yes it does. What is your wiki ID so we can grant you permission? > Set default retention ms for Streams repartition topics to Long.MAX_VALUE > - > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Khaireddine Rezgui >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. > More specifically, in {{RepartitionTopicConfig}} we have a few default > overrides for repartition topic configs, we should just add the override for > {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still > allows users to override themselves if they want via > {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this > update takes effect. > In addition to the code change, we also need to have doc changes in > streams/upgrade_guide.html specifying this default value change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3827) log.message.format.version should default to inter.broker.protocol.version
[ https://issues.apache.org/jira/browse/KAFKA-3827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429491#comment-16429491 ] Manasvi Gupta commented on KAFKA-3827: -- Hi All, Just wanted to check if this is still a valid bug that needs to be fixed. Thanks Manasvi > log.message.format.version should default to inter.broker.protocol.version > -- > > Key: KAFKA-3827 > URL: https://issues.apache.org/jira/browse/KAFKA-3827 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.0.0 >Reporter: Jun Rao >Assignee: Manasvi Gupta >Priority: Major > Labels: newbie > > Currently, if one sets inter.broker.protocol.version to 0.9.0 and restarts > the broker, one will get the following exception since > log.message.format.version defaults to 0.10.0. It will be more intuitive if > log.message.format.version defaults to the value of > inter.broker.protocol.version. > java.lang.IllegalArgumentException: requirement failed: > log.message.format.version 0.10.0-IV1 cannot be used when > inter.broker.protocol.version is set to 0.9.0.1 > at scala.Predef$.require(Predef.scala:233) > at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1023) > at kafka.server.KafkaConfig.(KafkaConfig.scala:994) > at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743) > at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740) > at > kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28) > at kafka.Kafka$.main(Kafka.scala:58) > at kafka.Kafka.main(Kafka.scala) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6629) SegmentedCacheFunctionTest does not cover session window serdes
[ https://issues.apache.org/jira/browse/KAFKA-6629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manasvi Gupta reassigned KAFKA-6629: Assignee: Manasvi Gupta > SegmentedCacheFunctionTest does not cover session window serdes > --- > > Key: KAFKA-6629 > URL: https://issues.apache.org/jira/browse/KAFKA-6629 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Manasvi Gupta >Priority: Major > Labels: newbie, unit-test > > The SegmentedCacheFunctionTest.java only covers time window serdes, but not > session window serdes. We should fill in this coverage gap. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6760) responses not logged properly in controller
[ https://issues.apache.org/jira/browse/KAFKA-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429417#comment-16429417 ] ASF GitHub Bot commented on KAFKA-6760: --- mimaison opened a new pull request #4834: KAFKA-6760: Responses not logged properly in controller URL: https://github.com/apache/kafka/pull/4834 Added toString() to LeaderAndIsrResponse and StopReplicaResponse *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > responses not logged properly in controller > --- > > Key: KAFKA-6760 > URL: https://issues.apache.org/jira/browse/KAFKA-6760 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.1.0 >Reporter: Jun Rao >Priority: Major > Labels: newbie > > Saw the following logging in controller.log. We need to log the > StopReplicaResponse properly in KafkaController. > [2018-04-05 14:38:41,878] DEBUG [Controller id=0] Delete topic callback > invoked for org.apache.kafka.common.requests.StopReplicaResponse@263d40c > (kafka.controller.K > afkaController) > It seems that the same issue exists for LeaderAndIsrResponse as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429416#comment-16429416 ] Ted Yu commented on KAFKA-6762: --- Here is related code: {code} if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= maxBufferSize) throw new IllegalStateException("This log contains a message larger than maximum allowable size of %s.".format(maxBufferSize)) {code} At least we should log the [read | write] buffer capacity in the exception message. This is the next line: {code} val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize) {code} I think the condition for throwing IllegalStateException should be relaxed: if we cannot grow the buffer by doubling, we can grow buffer by some increment large enough to fulfill log cleaning. Can you turn on DEBUG logging in your cluster ? Maybe we can get more information from the DEBUG log. Thanks > log-cleaner thread terminates due to java.lang.IllegalStateException > > > Key: KAFKA-6762 > URL: https://issues.apache.org/jira/browse/KAFKA-6762 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 > Environment: os: GNU/Linux > arch: x86_64 > Kernel: 4.9.77 > jvm: OpenJDK 1.8.0 >Reporter: Ricardo Bartolome >Priority: Major > > We are experiencing some problems with kafka log-cleaner thread on Kafka > 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order > to fix KAFKA-6683, but until then we can only confirm that it happens in > 1.0.0. > log-cleaner thread crashes after a while with the following error: > {code:java} > [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log > __consumer_offsets-31. (kafka.log.LogCleaner) > [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for > __consumer_offsets-31... (kafka.log.LogCleaner) > [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log > __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). > (kafka.log.LogCleaner) > [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log > __consumer_offsets-31 complete. (kafka.log.LogCleaner) > [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 > (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior > to Sat Feb 24 11:04:21 GMT 2018 > )... (kafka.log.LogCleaner) > [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log > __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into > 0, discarding deletes. (kafka.log.LogClea > ner) > [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from > 262144bytes to 524288 bytes. (kafka.log.LogCleaner) > [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from > 524288bytes to 112 bytes. (kafka.log.LogCleaner) > [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to > (kafka.log.LogCleaner) > java.lang.IllegalStateException: This log contains a message larger than > maximum allowable size of 112. > at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459) > at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396) > at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Cleaner.doClean(LogCleaner.scala:395) > at kafka.log.Cleaner.clean(LogCleaner.scala:372) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) > [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped > (kafka.log.LogCleaner) > [2018-04-04 14:25:12,773] INFO The cleaning for partition > __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner) > [2018-04-04 14:25:12,773] INFO Compaction for partition > __broker-11-health-check-0 is resumed (kafka.log.LogCleaner) > [2018-04-04 14:25:12,774] INFO The cleaning for partition > __broker-11-health-check-0 is aborted (kafka.log.LogCleaner) > [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. > (kafka.log.LogCleaner) > [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down > (kafka.log.LogCleaner) > [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown > completed (kafka.log.LogCleaner) > {code} > What we know so far is: > * We are unable to reproduce it yet in a consistent manner. > * It only happens in the PRO cluster and not in the PRE cluster for
[jira] [Updated] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ricardo Bartolome updated KAFKA-6762: - Description: We are experiencing some problems with kafka log-cleaner thread on Kafka 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order to fix KAFKA-6683, but until then we can only confirm that it happens in 1.0.0. log-cleaner thread crashes after a while with the following error: {code:java} [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-31. (kafka.log.LogCleaner) [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for __consumer_offsets-31... (kafka.log.LogCleaner) [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). (kafka.log.LogCleaner) [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log __consumer_offsets-31 complete. (kafka.log.LogCleaner) [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior to Sat Feb 24 11:04:21 GMT 2018 )... (kafka.log.LogCleaner) [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 0, discarding deletes. (kafka.log.LogClea ner) [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 262144bytes to 524288 bytes. (kafka.log.LogCleaner) [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 524288bytes to 112 bytes. (kafka.log.LogCleaner) [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 112. at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459) at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396) at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395) at scala.collection.immutable.List.foreach(List.scala:389) at kafka.log.Cleaner.doClean(LogCleaner.scala:395) at kafka.log.Cleaner.clean(LogCleaner.scala:372) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner) [2018-04-04 14:25:12,773] INFO The cleaning for partition __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner) [2018-04-04 14:25:12,773] INFO Compaction for partition __broker-11-health-check-0 is resumed (kafka.log.LogCleaner) [2018-04-04 14:25:12,774] INFO The cleaning for partition __broker-11-health-check-0 is aborted (kafka.log.LogCleaner) [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. (kafka.log.LogCleaner) [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down (kafka.log.LogCleaner) [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown completed (kafka.log.LogCleaner) {code} What we know so far is: * We are unable to reproduce it yet in a consistent manner. * It only happens in the PRO cluster and not in the PRE cluster for the same customer (which message payloads are very similar) * Checking our Kafka logs, it only happened on the internal topics *__consumer_offsets-** * When we restart the broker process the log-cleaner starts working again but it can take between 3 minutes and some hours to die again. * We workaround it by temporary increasing the message.max.bytes and replica.fetch.max.bytes values to 10485760 (10MB) from default 112 (~1MB). ** Before message.max.bytes = 10MB, we tried to match message.max.size with the value of replica.fetch.max.size (1048576), but log-cleaned died with the same error but different limit. ** This allowed the log-cleaner not to die and compact enough data as for disk space to go from ~600GB to ~100GB. ** Without this limit change, the log-cleaner dies after a while and the used disk space stay at ~450GB and starts growing again due to cluster activity. Our server.properties content is as follows, as printed ins server.log at broker startup. {code:java} broker.id=11 delete.topic.enable=true advertised.listeners=PLAINTEXT://broker-ip:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/var/lib/kafka num.partitions=12 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3
[jira] [Created] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException
Ricardo Bartolome created KAFKA-6762: Summary: log-cleaner thread terminates due to java.lang.IllegalStateException Key: KAFKA-6762 URL: https://issues.apache.org/jira/browse/KAFKA-6762 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.0.0 Environment: os: GNU/Linux arch: x86_64 Kernel: 4.9.77 jvm: OpenJDK 1.8.0 Reporter: Ricardo Bartolome We are experiencing some problems with kafka log-cleaner thread on Kafka 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order to fix KAFKA-6683, but until then we can only confirm that it happens in 1.0.0. log-cleaner thread crashes after a while with the following error: {code:java} [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-31. (kafka.log.LogCleaner) [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for __consumer_offsets-31... (kafka.log.LogCleaner) [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). (kafka.log.LogCleaner) [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log __consumer_offsets-31 complete. (kafka.log.LogCleaner) [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior to Sat Feb 24 11:04:21 GMT 2018 )... (kafka.log.LogCleaner) [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 0, discarding deletes. (kafka.log.LogClea ner) [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 262144bytes to 524288 bytes. (kafka.log.LogCleaner) [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 524288bytes to 112 bytes. (kafka.log.LogCleaner) [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to (kafka.log.LogCleaner) java.lang.IllegalStateException: This log contains a message larger than maximum allowable size of 112. at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622) at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574) at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459) at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396) at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395) at scala.collection.immutable.List.foreach(List.scala:389) at kafka.log.Cleaner.doClean(LogCleaner.scala:395) at kafka.log.Cleaner.clean(LogCleaner.scala:372) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped (kafka.log.LogCleaner) [2018-04-04 14:25:12,773] INFO The cleaning for partition __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner) [2018-04-04 14:25:12,773] INFO Compaction for partition __broker-11-health-check-0 is resumed (kafka.log.LogCleaner) [2018-04-04 14:25:12,774] INFO The cleaning for partition __broker-11-health-check-0 is aborted (kafka.log.LogCleaner) [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. (kafka.log.LogCleaner) [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down (kafka.log.LogCleaner) [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown completed (kafka.log.LogCleaner) {code} What we know so far is: * We are unable to reproduce it yet in a consistent manner. * It only happens in the PRO cluster and not in the PRE cluster for the same customer (which message payloads are very similar) * Checking our Kafka logs, it only happened on the internal topics *__consumer_offsets-** * When we restart the broker process the log-cleaner starts working again but it can take between 3 minutes and some hours to die again. * We workaround it by temporary increasing the message.max.bytes and replica.fetch.max.bytes values to 10485760 (10MB) from default 112 (~1MB). ** Before message.max.bytes = 10MB, we tried to match message.max.size with the value of replica.fetch.max.size (1048576), but log-cleaned died with the same error but different limit. ** This allowed the log-cleaner not to die and compact enough data as for disk space to go from ~600GB to ~100GB. ** Without this limit change, the log-cleaner dies after a while and the used disk space stay at ~450GB and starts growing again due to cluster activity. Our server.properties content is as follows, as printed ins server.log at broker startup. {code:java} broker.id=11 delete.topic.enable=true advertised.listeners=PLAINTEXT://broker-ip:9092 num.network.threads=3
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429352#comment-16429352 ] Khaireddine Rezgui commented on KAFKA-6535: --- hi [~vvcephei], i have access to confluence wiki, but i haven't create menu, is it requere permision ? > Set default retention ms for Streams repartition topics to Long.MAX_VALUE > - > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Khaireddine Rezgui >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. > More specifically, in {{RepartitionTopicConfig}} we have a few default > overrides for repartition topic configs, we should just add the override for > {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still > allows users to override themselves if they want via > {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this > update takes effect. > In addition to the code change, we also need to have doc changes in > streams/upgrade_guide.html specifying this default value change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)