[jira] [Updated] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning

2018-04-07 Thread Larry McQueary (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Larry McQueary updated KAFKA-6764:
--
Summary: ConsoleConsumer behavior inconsistent when specifying --partition 
with --from-beginning   (was: ConsoleConsumer behavior inconsistent when 
specifying --partition --from-beginning )

> ConsoleConsumer behavior inconsistent when specifying --partition with 
> --from-beginning 
> 
>
> Key: KAFKA-6764
> URL: https://issues.apache.org/jira/browse/KAFKA-6764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Larry McQueary
>Priority: Minor
>  Labels: newbie
>
> Per its usage statement, {{kafka-console-consumer.sh}} ignores 
> {{\-\-from-beginning}} when the specified consumer group has committed 
> offsets, and sets {{auto.offset.reset}} to {{latest}}. However, if 
> {{\-\-partition}} is also specified, {{\-\-from-beginning}} is observed in 
> all cases, whether there are committed offsets or not.
> This happens because when {{\-\-from-beginning}} is specified, {{offsetArg}} 
> is set to {{OffsetRequest.EarliestTime}}. However, {{offsetArg}} is [only 
> passed to the 
> constructor|https://github.com/apache/kafka/blob/fedac0cea74fce529ee1c0cefd6af53ecbdd/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L76-L79]
>  for {{NewShinyConsumer}} when {{\-\-partition}} is also specified.
> This case should either be handled consistently, or the usage statement 
> should be modified to indicate the actual behavior/usage. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition --from-beginning

2018-04-07 Thread Larry McQueary (JIRA)
Larry McQueary created KAFKA-6764:
-

 Summary: ConsoleConsumer behavior inconsistent when specifying 
--partition --from-beginning 
 Key: KAFKA-6764
 URL: https://issues.apache.org/jira/browse/KAFKA-6764
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Larry McQueary


Per its usage statement, {{kafka-console-consumer.sh}} ignores 
{{\-\-from-beginning}} when the specified consumer group has committed offsets, 
and sets {{auto.offset.reset}} to {{latest}}. However, if {{\-\-partition}} is 
also specified, {{\-\-from-beginning}} is observed in all cases, whether there 
are committed offsets or not.

This happens because when {{\-\-from-beginning}} is specified, {{offsetArg}} is 
set to {{OffsetRequest.EarliestTime}}. However, {{offsetArg}} is [only passed 
to the 
constructor|https://github.com/apache/kafka/blob/fedac0cea74fce529ee1c0cefd6af53ecbdd/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L76-L79]
 for {{NewShinyConsumer}} when {{\-\-partition}} is also specified.

This case should either be handled consistently, or the usage statement should 
be modified to indicate the actual behavior/usage. 




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2526) Console Producer / Consumer's serde config is not working

2018-04-07 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429606#comment-16429606
 ] 

huxihx commented on KAFKA-2526:
---

[~mgharat] Do you still work on this jira?

> Console Producer / Consumer's serde config is not working
> -
>
> Key: KAFKA-2526
> URL: https://issues.apache.org/jira/browse/KAFKA-2526
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: newbie
>
> Although in the console producer one can specify the key value serializer, 
> they are actually not used since 1) it always serialize the input string as 
> String.getBytes (hence always pre-assume the string serializer) and 2) it is 
> actually only passed into the old producer. The same issues exist in console 
> consumer.
> In addition the configs in the console producer is messy: we have 1) some 
> config values exposed as cmd parameters, and 2) some config values in 
> --producer-property and 3) some in --property.
> It will be great to clean the configs up in both console producer and 
> consumer, and put them into a single --property parameter which could 
> possibly take a file to reading in property values as well, and only leave 
> --new-producer as the other command line parameter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable

2018-04-07 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429570#comment-16429570
 ] 

Matthias J. Sax commented on KAFKA-6742:


[~vale68] I added you to the list of contributors and assigned this ticket to 
you. You can assign tickets to yourself now, too.

> TopologyTestDriver error when dealing with stores from GlobalKTable
> ---
>
> Key: KAFKA-6742
> URL: https://issues.apache.org/jira/browse/KAFKA-6742
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Valentino Proietti
>Assignee: Valentino Proietti
>Priority: Minor
>
> {color:#ff}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map getAllStateStores() {
> //        final Map allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> 

[jira] [Assigned] (KAFKA-6742) TopologyTestDriver error when dealing with stores from GlobalKTable

2018-04-07 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6742:
--

Assignee: Valentino Proietti

> TopologyTestDriver error when dealing with stores from GlobalKTable
> ---
>
> Key: KAFKA-6742
> URL: https://issues.apache.org/jira/browse/KAFKA-6742
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Valentino Proietti
>Assignee: Valentino Proietti
>Priority: Minor
>
> {color:#ff}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map getAllStateStores() {
> //        final Map allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.advanceWallClockTime(0);
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("Second", 

[jira] [Created] (KAFKA-6763) Consider using direct byte buffers in SslTransportLayer

2018-04-07 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-6763:
--

 Summary: Consider using direct byte buffers in SslTransportLayer
 Key: KAFKA-6763
 URL: https://issues.apache.org/jira/browse/KAFKA-6763
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma


We use heap byte buffers in SslTransportLayer. For netReadBuffer and 
netWriteBuffer, it means that the NIO layer has to copy to/from a native buffer 
before it can write/read to the socket. It would be good to test if switching 
to direct byte buffers improves performance. We can't be sure as the benefit of 
avoiding the copy could be offset by the specifics of the operations we perform 
on netReadBuffer, netWriteBuffer and appReadBuffer.

We should benchmark produce and consume performance and try a few combinations 
of direct/heap byte buffers for netReadBuffer, netWriteBuffer and appReadBuffer 
(the latter should probably remain as a heap byte buffer, but no harm in 
testing it too).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-04-07 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429509#comment-16429509
 ] 

Matthias J. Sax commented on KAFKA-6535:


[~Khairy] Yes it does. What is your wiki ID so we can grant you permission?

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3827) log.message.format.version should default to inter.broker.protocol.version

2018-04-07 Thread Manasvi Gupta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429491#comment-16429491
 ] 

Manasvi Gupta commented on KAFKA-3827:
--

Hi All,

Just wanted to check if this is still a valid bug that needs to be fixed.

Thanks

Manasvi

> log.message.format.version should default to inter.broker.protocol.version
> --
>
> Key: KAFKA-3827
> URL: https://issues.apache.org/jira/browse/KAFKA-3827
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Jun Rao
>Assignee: Manasvi Gupta
>Priority: Major
>  Labels: newbie
>
> Currently, if one sets inter.broker.protocol.version to 0.9.0 and restarts 
> the broker, one will get the following exception since 
> log.message.format.version defaults to 0.10.0. It will be more intuitive if 
> log.message.format.version defaults to the value of 
> inter.broker.protocol.version.
> java.lang.IllegalArgumentException: requirement failed: 
> log.message.format.version 0.10.0-IV1 cannot be used when 
> inter.broker.protocol.version is set to 0.9.0.1
>   at scala.Predef$.require(Predef.scala:233)
>   at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1023)
>   at kafka.server.KafkaConfig.(KafkaConfig.scala:994)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743)
>   at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740)
>   at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
>   at kafka.Kafka$.main(Kafka.scala:58)
>   at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6629) SegmentedCacheFunctionTest does not cover session window serdes

2018-04-07 Thread Manasvi Gupta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manasvi Gupta reassigned KAFKA-6629:


Assignee: Manasvi Gupta

> SegmentedCacheFunctionTest does not cover session window serdes
> ---
>
> Key: KAFKA-6629
> URL: https://issues.apache.org/jira/browse/KAFKA-6629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Manasvi Gupta
>Priority: Major
>  Labels: newbie, unit-test
>
> The SegmentedCacheFunctionTest.java only covers time window serdes, but not 
> session window serdes. We should fill in this coverage gap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6760) responses not logged properly in controller

2018-04-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429417#comment-16429417
 ] 

ASF GitHub Bot commented on KAFKA-6760:
---

mimaison opened a new pull request #4834: KAFKA-6760: Responses not logged 
properly in controller
URL: https://github.com/apache/kafka/pull/4834
 
 
   Added toString() to LeaderAndIsrResponse and StopReplicaResponse
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> responses not logged properly in controller
> ---
>
> Key: KAFKA-6760
> URL: https://issues.apache.org/jira/browse/KAFKA-6760
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: newbie
>
> Saw the following logging in controller.log. We need to log the 
> StopReplicaResponse properly in KafkaController.
> [2018-04-05 14:38:41,878] DEBUG [Controller id=0] Delete topic callback 
> invoked for org.apache.kafka.common.requests.StopReplicaResponse@263d40c 
> (kafka.controller.K
> afkaController)
> It seems that the same issue exists for LeaderAndIsrResponse as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException

2018-04-07 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429416#comment-16429416
 ] 

Ted Yu commented on KAFKA-6762:
---

Here is related code:
{code}
if(readBuffer.capacity >= maxBufferSize || writeBuffer.capacity >= 
maxBufferSize)
  throw new IllegalStateException("This log contains a message larger than 
maximum allowable size of %s.".format(maxBufferSize))
{code}
At least we should log the [read | write] buffer capacity in the exception 
message.
This is the next line:
{code}
val newSize = math.min(this.readBuffer.capacity * 2, maxBufferSize)
{code}
I think the condition for throwing IllegalStateException should be relaxed:
if we cannot grow the buffer by doubling, we can grow buffer by some increment 
large enough to fulfill log cleaning.

Can you turn on DEBUG logging in your cluster ? Maybe we can get more 
information from the DEBUG log.

Thanks

> log-cleaner thread terminates due to java.lang.IllegalStateException
> 
>
> Key: KAFKA-6762
> URL: https://issues.apache.org/jira/browse/KAFKA-6762
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
> Environment: os: GNU/Linux 
> arch: x86_64 
> Kernel: 4.9.77 
> jvm: OpenJDK 1.8.0
>Reporter: Ricardo Bartolome
>Priority: Major
>
> We are experiencing some problems with kafka log-cleaner thread on Kafka 
> 1.0.0. We have planned to update this cluster to 1.1.0 by next week in order 
> to fix KAFKA-6683, but until then we can only confirm that it happens in 
> 1.0.0.
> log-cleaner thread crashes after a while with the following error:
> {code:java}
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
> __consumer_offsets-31. (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for 
> __consumer_offsets-31... (kafka.log.LogCleaner)
> [2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). 
> (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log 
> __consumer_offsets-31 complete. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 
> (cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior 
> to Sat Feb 24 11:04:21 GMT 2018
> )... (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log 
> __consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 
> 0, discarding deletes. (kafka.log.LogClea
> ner)
> [2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 262144bytes to 524288 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 
> 524288bytes to 112 bytes. (kafka.log.LogCleaner)
> [2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.lang.IllegalStateException: This log contains a message larger than 
> maximum allowable size of 112.
> at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
> at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
> at scala.collection.immutable.List.foreach(List.scala:389)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
> at kafka.log.Cleaner.clean(LogCleaner.scala:372)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> [2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,773] INFO Compaction for partition 
> __broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
> [2018-04-04 14:25:12,774] INFO The cleaning for partition 
> __broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down 
> (kafka.log.LogCleaner)
> [2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown 
> completed (kafka.log.LogCleaner)
> {code}
> What we know so far is:
>  * We are unable to reproduce it yet in a consistent manner.
>  * It only happens in the PRO cluster and not in the PRE cluster for 

[jira] [Updated] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException

2018-04-07 Thread Ricardo Bartolome (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ricardo Bartolome updated KAFKA-6762:
-
Description: 
We are experiencing some problems with kafka log-cleaner thread on Kafka 1.0.0. 
We have planned to update this cluster to 1.1.0 by next week in order to fix 
KAFKA-6683, but until then we can only confirm that it happens in 1.0.0.

log-cleaner thread crashes after a while with the following error:
{code:java}
[2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-31. (kafka.log.LogCleaner)
[2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for 
__consumer_offsets-31... (kafka.log.LogCleaner)
[2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). 
(kafka.log.LogCleaner)
[2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log 
__consumer_offsets-31 complete. (kafka.log.LogCleaner)
[2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 
(cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior to 
Sat Feb 24 11:04:21 GMT 2018
)... (kafka.log.LogCleaner)
[2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 0, 
discarding deletes. (kafka.log.LogClea
ner)
[2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 
262144bytes to 524288 bytes. (kafka.log.LogCleaner)
[2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 
524288bytes to 112 bytes. (kafka.log.LogCleaner)
[2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.lang.IllegalStateException: This log contains a message larger than 
maximum allowable size of 112.
at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
at kafka.log.Cleaner.clean(LogCleaner.scala:372)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
[2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
[2018-04-04 14:25:12,773] INFO The cleaning for partition 
__broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
[2018-04-04 14:25:12,773] INFO Compaction for partition 
__broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
[2018-04-04 14:25:12,774] INFO The cleaning for partition 
__broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
[2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. 
(kafka.log.LogCleaner)
[2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down 
(kafka.log.LogCleaner)
[2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown completed 
(kafka.log.LogCleaner)
{code}
What we know so far is:
 * We are unable to reproduce it yet in a consistent manner.
 * It only happens in the PRO cluster and not in the PRE cluster for the same 
customer (which message payloads are very similar)
 * Checking our Kafka logs, it only happened on the internal topics 
*__consumer_offsets-**
 * When we restart the broker process the log-cleaner starts working again but 
it can take between 3 minutes and some hours to die again.
 * We workaround it by temporary increasing the message.max.bytes and 
replica.fetch.max.bytes values to 10485760 (10MB) from default 112 (~1MB).
** Before message.max.bytes = 10MB, we tried to match message.max.size with the 
value of replica.fetch.max.size (1048576), but log-cleaned died with the same 
error but different limit.
 ** This allowed the log-cleaner not to die and compact enough data as for disk 
space to go from ~600GB to ~100GB.
 ** Without this limit change, the log-cleaner dies after a while and the used 
disk space stay at ~450GB and starts growing again due to cluster activity.

Our server.properties content is as follows, as printed ins server.log at 
broker startup.
{code:java}
broker.id=11
delete.topic.enable=true
advertised.listeners=PLAINTEXT://broker-ip:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=12
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3

[jira] [Created] (KAFKA-6762) log-cleaner thread terminates due to java.lang.IllegalStateException

2018-04-07 Thread Ricardo Bartolome (JIRA)
Ricardo Bartolome created KAFKA-6762:


 Summary: log-cleaner thread terminates due to 
java.lang.IllegalStateException
 Key: KAFKA-6762
 URL: https://issues.apache.org/jira/browse/KAFKA-6762
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.0.0
 Environment: os: GNU/Linux 
arch: x86_64 
Kernel: 4.9.77 
jvm: OpenJDK 1.8.0
Reporter: Ricardo Bartolome


We are experiencing some problems with kafka log-cleaner thread on Kafka 1.0.0. 
We have planned to update this cluster to 1.1.0 by next week in order to fix 
KAFKA-6683, but until then we can only confirm that it happens in 1.0.0.

log-cleaner thread crashes after a while with the following error:
{code:java}
[2018-03-28 11:14:40,199] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-31. (kafka.log.LogCleaner)
[2018-03-28 11:14:40,199] INFO Cleaner 0: Building offset map for 
__consumer_offsets-31... (kafka.log.LogCleaner)
[2018-03-28 11:14:40,218] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-31 for 16 segments in offset range [1612869, 14282934). 
(kafka.log.LogCleaner)
[2018-03-28 11:14:58,566] INFO Cleaner 0: Offset map for log 
__consumer_offsets-31 complete. (kafka.log.LogCleaner)
[2018-03-28 11:14:58,566] INFO Cleaner 0: Cleaning log __consumer_offsets-31 
(cleaning prior to Tue Mar 27 09:25:09 GMT 2018, discarding tombstones prior to 
Sat Feb 24 11:04:21 GMT 2018
)... (kafka.log.LogCleaner)
[2018-03-28 11:14:58,567] INFO Cleaner 0: Cleaning segment 0 in log 
__consumer_offsets-31 (largest timestamp Fri Feb 23 11:40:54 GMT 2018) into 0, 
discarding deletes. (kafka.log.LogClea
ner)
[2018-03-28 11:14:58,570] INFO Cleaner 0: Growing cleaner I/O buffers from 
262144bytes to 524288 bytes. (kafka.log.LogCleaner)
[2018-03-28 11:14:58,576] INFO Cleaner 0: Growing cleaner I/O buffers from 
524288bytes to 112 bytes. (kafka.log.LogCleaner)
[2018-03-28 11:14:58,593] ERROR [kafka-log-cleaner-thread-0]: Error due to 
(kafka.log.LogCleaner)
java.lang.IllegalStateException: This log contains a message larger than 
maximum allowable size of 112.
at kafka.log.Cleaner.growBuffers(LogCleaner.scala:622)
at kafka.log.Cleaner.cleanInto(LogCleaner.scala:574)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:459)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:396)
at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:395)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Cleaner.doClean(LogCleaner.scala:395)
at kafka.log.Cleaner.clean(LogCleaner.scala:372)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:263)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:243)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
[2018-03-28 11:14:58,601] INFO [kafka-log-cleaner-thread-0]: Stopped 
(kafka.log.LogCleaner)
[2018-04-04 14:25:12,773] INFO The cleaning for partition 
__broker-11-health-check-0 is aborted and paused (kafka.log.LogCleaner)
[2018-04-04 14:25:12,773] INFO Compaction for partition 
__broker-11-health-check-0 is resumed (kafka.log.LogCleaner)
[2018-04-04 14:25:12,774] INFO The cleaning for partition 
__broker-11-health-check-0 is aborted (kafka.log.LogCleaner)
[2018-04-04 14:25:22,850] INFO Shutting down the log cleaner. 
(kafka.log.LogCleaner)
[2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutting down 
(kafka.log.LogCleaner)
[2018-04-04 14:25:22,850] INFO [kafka-log-cleaner-thread-0]: Shutdown completed 
(kafka.log.LogCleaner)
{code}
What we know so far is:
 * We are unable to reproduce it yet in a consistent manner.
 * It only happens in the PRO cluster and not in the PRE cluster for the same 
customer (which message payloads are very similar)
 * Checking our Kafka logs, it only happened on the internal topics 
*__consumer_offsets-**
 * When we restart the broker process the log-cleaner starts working again but 
it can take between 3 minutes and some hours to die again.
 * We workaround it by temporary increasing the message.max.bytes and 
replica.fetch.max.bytes values to 10485760 (10MB) from default 112 (~1MB).
** Before message.max.bytes = 10MB, we tried to match message.max.size with the 
value of replica.fetch.max.size (1048576), but log-cleaned died with the same 
error but different limit.
 ** This allowed the log-cleaner not to die and compact enough data as for disk 
space to go from ~600GB to ~100GB.
 ** Without this limit change, the log-cleaner dies after a while and the used 
disk space stay at ~450GB and starts growing again due to cluster activity.

Our server.properties content is as follows, as printed ins server.log at 
broker startup.
{code:java}
broker.id=11
delete.topic.enable=true
advertised.listeners=PLAINTEXT://broker-ip:9092
num.network.threads=3

[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-04-07 Thread Khaireddine Rezgui (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16429352#comment-16429352
 ] 

Khaireddine Rezgui commented on KAFKA-6535:
---

hi [~vvcephei], i have access to confluence wiki, but i haven't create menu, is 
it requere permision ?

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)