Re: New consumer - ConsumerRecords partitions
Hey Stevo, I think ConsumerRecords only contains the partitions which had messages. Would you mind creating a jira for the feature request? You're welcome to submit a patch as well. -Jason On Tue, Jul 21, 2015 at 2:27 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka community, New HLC poll returns ConsumerRecords. Do ConsumerRecords contain records for every partition that HLC is actively subscribed on for every poll request, or does it contain only records for partitions which had messages and which were retrieved in poll request? If latter, then please consider adding a method to ConsumerRecords class, public IterableTopicPartition getPartitions() that the ConsumerRecords has. I could provide a PR. Kind regards, Stevo Slavic.
Re: broker data directory
Thank you, Nicolas! On Tue, Jul 21, 2015 at 10:46 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Yes indeed. # A comma seperated list of directories under which to store log files log.dirs=/var/lib/kafka You can put several disk/partitions too. Regards, On Tue, Jul 21, 2015 at 4:37 PM, Yuheng Du yuheng.du.h...@gmail.com wrote: Just wanna make sure, in server.properties, the configuration log.dirs=/tmp/kafka-logs specifies the directory of where the log (data) stores, right? If I want the data to be saved elsewhere, this is the configuration I need to change, right? Thanks for answering. best,
Re: New producer hangs inifitely when it looses connection to Kafka cluster
This is a known issue. There are a few relevant JIRAs and a KIP: https://issues.apache.org/jira/browse/KAFKA-1788 https://issues.apache.org/jira/browse/KAFKA-2120 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient -Ewen On Tue, Jul 21, 2015 at 7:05 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka community, Just noticed that : - message is successfully published using new 0.8.2.1 producer - and then Kafka is stopped next attempt to publish message using same instance of new producer hangs forever, and following stacktrace gets logged repeatedly: [WARN ] [o.a.kafka.common.network.Selector] [] Error in I/O with localhost/ 127.0.0.1 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_31] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) ~[na:1.8.0_31] at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] I expect producer to respect timeout settings even in this connection lost scenario. Is this a known bug? Is there something I can do/configure as a workaround? Kind regards, Stevo Slavic. -- Thanks, Ewen
Re: New consumer - poll/seek javadoc confusing, need clarification
On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka community, I find new consumer poll/seek javadoc a bit confusing. Just by reading docs I'm not sure what the outcome will be, what is expected in following scenario: - kafkaConsumer is instantiated with auto-commit off - kafkaConsumer.subscribe(someTopic) - kafkaConsumer.position is called for every TopicPartition HLC is actively subscribed on and then when doing multiple poll calls in succession (without calling commit), does seek have to be called in between poll calls to position HLC to skip what was read in previous poll, or does HLC keep that state (position after poll) in memory, so that next poll (without seek in between two poll calls) will continue from where last poll stopped? The position is tracked in-memory within the consumer, so as long as there isn't a consumer rebalance, consumption will just proceed with subsequent messages (i.e. the behavior I think most people would find intuitive). However, if a rebalance occurs (another consumer instance joins the group or some leave), then a partition may be assigned to an different consumer instance that has no idea about the current position and will restart based on the offset reset setting (because attempting to fetch the committed offset will fail since no offsets have been committed). -Ewen Could be it's just me not understanding this from javadoc. If not, maybe javadoc can be improved to make this (even) more obvious. Kind regards, Stevo Slavic. -- Thanks, Ewen
Re: Retrieving lost messages produced while the consumer was down.
Since you mentioned consumer groups, I'm assuming you're using the high level consumer? Do you have auto.commit.enable set to true? It sounds like when you start up you are always getting the auto.offset.reset behavior, which indicates you don't have any offsets committed. By default, that behavior is to consume from the latest offset (which would only get messages produced after the consumer starts). To get the behavior you're looking for, you should make sure to commit offsets when you're shutting down your consumer so it will resume where you left off the next time you start it. Unless you are using the SimpleConsumer, you shouldn't need to explicitly make any requests yourself. On Tue, Jul 21, 2015 at 2:24 PM, Tomas Niño Kehoe tomasninoke...@gmail.com wrote: Hi, We've been using Kafka for a couple of months, and now we're trying to to write a Simple application using the ConsumerGroup to fully understand Kafka. Having the producer continually writing data, our consumer occasionally needs to be restarted. However, once the program is brought back up, messages which we're produced during that period of time are not being read. Instead, the consumer (this is a single consumer inside a Consume group) will read the messages produced after it was brought back up. Its configuration doesn't change at all. For example using the simple consumer/producer apps: Produced 1, 2, 3, 4, 5 Consumed 1, 2, 3, 4, 5 [Stop the consumer] Produce 20, 21, 22, 23 When the consumer is brought back up, I'd like to get 20, 21, 22, 23, but I will only get either new messages, or all the messages using (--from-beginning). Is there a way of achieving this programatically, without for example writing an offset into the zookeeper node? Is the OffsetCommitRequest the way to go? Thanks in advance Tomás -- Thanks, Ewen
Retrieving lost messages produced while the consumer was down.
Hi, We've been using Kafka for a couple of months, and now we're trying to to write a Simple application using the ConsumerGroup to fully understand Kafka. Having the producer continually writing data, our consumer occasionally needs to be restarted. However, once the program is brought back up, messages which we're produced during that period of time are not being read. Instead, the consumer (this is a single consumer inside a Consume group) will read the messages produced after it was brought back up. Its configuration doesn't change at all. For example using the simple consumer/producer apps: Produced 1, 2, 3, 4, 5 Consumed 1, 2, 3, 4, 5 [Stop the consumer] Produce 20, 21, 22, 23 When the consumer is brought back up, I'd like to get 20, 21, 22, 23, but I will only get either new messages, or all the messages using (--from-beginning). Is there a way of achieving this programatically, without for example writing an offset into the zookeeper node? Is the OffsetCommitRequest the way to go? Thanks in advance Tomás
Implementing a custom partitioner
Hi all, If I wanted to write my own partitioner, all I would need to do is write a class that extends Partitioner and override the partition function, correct? I am currently doing so, at least, with a class in the package 'services', yet when I use: properties.put(partitioner.class, services.myPartitioner); and instantiate my producer, this doesn't work properly. I'm using a simple switch statement, so I am led to believe that I have not improperly written my partitioner. After attempting to debug the issue, I notice that the constructor I'm entering when attempting to instantiate the producer has the line: this.partitioner = new Partitioner(); which more or less ignores my input. Any ideas? Help is appreciated! -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427
Re: Implementing a custom partitioner
Hi, Are you using the latest trunk for Producer API?. Did you implement the interface here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer -- Harsha On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote: Hi all, If I wanted to write my own partitioner, all I would need to do is write a class that extends Partitioner and override the partition function, correct? I am currently doing so, at least, with a class in the package 'services', yet when I use: properties.put(partitioner.class, services.myPartitioner); and instantiate my producer, this doesn't work properly. I'm using a simple switch statement, so I am led to believe that I have not improperly written my partitioner. After attempting to debug the issue, I notice that the constructor I'm entering when attempting to instantiate the producer has the line: this.partitioner = new Partitioner(); which more or less ignores my input. Any ideas? Help is appreciated! -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427
Re: Implementing a custom partitioner
Sriharsha, thanks for your response. I'm using version 0.8.2, and I am implementing kafka.producer.Partitioner. I noticed that in the latest trunk the line I specified above is replaced with: this.partitioner = config.getConfiguredInstance(ProducerConfig. PARTITIONER_CLASS_CONFIG, Partitioner.class); does this mean I cannot use my own partitioner in v 0.8.2? On Tue, Jul 21, 2015 at 2:48 PM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, Are you using the latest trunk for Producer API?. Did you implement the interface here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer -- Harsha On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote: Hi all, If I wanted to write my own partitioner, all I would need to do is write a class that extends Partitioner and override the partition function, correct? I am currently doing so, at least, with a class in the package 'services', yet when I use: properties.put(partitioner.class, services.myPartitioner); and instantiate my producer, this doesn't work properly. I'm using a simple switch statement, so I am led to believe that I have not improperly written my partitioner. After attempting to debug the issue, I notice that the constructor I'm entering when attempting to instantiate the producer has the line: this.partitioner = new Partitioner(); which more or less ignores my input. Any ideas? Help is appreciated! -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427 -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427
Re: Implementing a custom partitioner
If you are using the new producer api from kafka 0.8.2 there is no pluggable partitioner in it for this you need to use the latest trunk. But in 0.8.2 if you are using old producer code you can implement a pluggable partitioner https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/ProducerConfig.scala#L69 by implementing this interface https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Partitioner.scala and its get created here https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Producer.scala#L61 Thanks, Harsha On July 21, 2015 at 2:54:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote: Sriharsha, thanks for your response. I'm using version 0.8.2, and I am implementing kafka.producer.Partitioner. I noticed that in the latest trunk the line I specified above is replaced with: this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); does this mean I cannot use my own partitioner in v 0.8.2? On Tue, Jul 21, 2015 at 2:48 PM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, Are you using the latest trunk for Producer API?. Did you implement the interface here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer -- Harsha On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote: Hi all, If I wanted to write my own partitioner, all I would need to do is write a class that extends Partitioner and override the partition function, correct? I am currently doing so, at least, with a class in the package 'services', yet when I use: properties.put(partitioner.class, services.myPartitioner); and instantiate my producer, this doesn't work properly. I'm using a simple switch statement, so I am led to believe that I have not improperly written my partitioner. After attempting to debug the issue, I notice that the constructor I'm entering when attempting to instantiate the producer has the line: this.partitioner = new Partitioner(); which more or less ignores my input. Any ideas? Help is appreciated! -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427 -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu | (925) 400-3427
ZK chroot path would be automatically created since Kafka 0.8.2.0?
Hi, The document about zookeeper.connect on Broker Configs says that Note that you must create this path yourself prior to starting the broker, but it seems the broker creates the path automatically on start up (maybe related issue: https://issues.apache.org/jira/browse/KAFKA-404 ). So the document is not just up to date? Thanks, Yuto Sasaki
Re: New consumer - poll/seek javadoc confusing, need clarification
On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka community, I find new consumer poll/seek javadoc a bit confusing. Just by reading docs I'm not sure what the outcome will be, what is expected in following scenario: - kafkaConsumer is instantiated with auto-commit off - kafkaConsumer.subscribe(someTopic) - kafkaConsumer.position is called for every TopicPartition HLC is actively subscribed on and then when doing multiple poll calls in succession (without calling commit), does seek have to be called in between poll calls to position HLC to skip what was read in previous poll, or does HLC keep that state (position after poll) in memory, so that next poll (without seek in between two poll calls) will continue from where last poll stopped? The position is tracked in-memory within the consumer, so as long as there isn't a consumer rebalance, consumption will just proceed with subsequent messages (i.e. the behavior I think most people would find intuitive). However, if a rebalance occurs (another consumer instance joins the group or some leave), then a partition may be assigned to an different consumer instance that has no idea about the current position and will restart based on the offset reset setting (because attempting to fetch the committed offset will fail since no offsets have been committed). Ewen, What happens if there is a broker failure and a new broker becomes the partition leader? Does the high level consumer start listening to the new partition leader at the in-memory position, or does it restart based on saved offsets? Thanks, -James -Ewen Could be it's just me not understanding this from javadoc. If not, maybe javadoc can be improved to make this (even) more obvious. Kind regards, Stevo Slavic. -- Thanks, Ewen
Re: Issue with corrupt message in Topic
Hi Nicolas, From my experience there are only two ways out: 1) wait for retention time to pass, so data gets deleted (this is usually unacceptable) 2) trace offset of corrupt message on all affected subscriptions and skip this message by overwriting it (offset+1) Problem is, that when encountering corrupt message, high level consumer iterator goes into invalid state and closes. There is no way to skip this message or recover from it without skipping offsets. You might try to use SimpleConsumer though. Maybe someone knows other ways to deal with this problem, but we haven't found any. BR, Adam 2015-07-21 9:38 GMT+02:00 Nicolas Phung nicolas.ph...@gmail.com: Hello, I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message from my Kafka topic with Spark Streaming, I've got the following error : kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3561357254, computed crc = 171652633) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator. scala:102) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator. scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla te.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa geHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa geHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:617) at java.lang.Thread.run(Thread.java:745) From my understanding, there's some corrupt message in my topic. I'm using the new Producer API to send message compress with Snappy. I found an old topic talking about it but with no further step to resolve the issue. Do you have any informations regarding this ? Is it possible in Kafka to somehow reread the topic and drop corrupt message ? Regards, Nicolas PHUNG
Issue with corrupt message in Topic
Hello, I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message from my Kafka topic with Spark Streaming, I've got the following error : kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3561357254, computed crc = 171652633) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator. scala:102) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator. scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla te.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa geHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa geHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:617) at java.lang.Thread.run(Thread.java:745) From my understanding, there's some corrupt message in my topic. I'm using the new Producer API to send message compress with Snappy. I found an old topic talking about it but with no further step to resolve the issue. Do you have any informations regarding this ? Is it possible in Kafka to somehow reread the topic and drop corrupt message ? Regards, Nicolas PHUNG
Re: New consumer - consumer group init
Thanks all for fast feedback! It's great news if that aspect is improved as well in new HLC. I will test and get back with any related findings. Kind regards, Stevo Slavic. On Mon, Jul 20, 2015 at 9:57 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Stevo, I am still not very clear on your point yet, I guess I was trying to figure out under which circumstances would users prefer to re-set the group id at an existing consumer rather than creating a new instance. As Jason mentioned, since the new consumer is single threaded it should usually be cheap to construct. Guozhang On Mon, Jul 20, 2015 at 11:06 AM, Jason Gustafson ja...@confluent.io wrote: Hey Stevo, The new consumer doesn't have any threads of its own, so I think construction should be fairly cheap. -Jason On Sun, Jul 19, 2015 at 2:13 PM, Stevo Slavić ssla...@gmail.com wrote: Hello Guozhang, It would be enough if consumer group could, besides at construction time, be set once only after construction. Have to retest, but high level consumer in 0.8.1 used to be very heavy weight object (lots of threads started, and it would block and take time to construct it). It's understandable, considering all of the high level features it has, and since it's supposed to be long living object. What would improve with this change is that construction penalty could be paid upfront, while topic subscription and joining consumer group ensemble could be done on first use, so that first use does not have to suffer from both init and subscription penalties. It would be nice also if consumer group just as subscription could be changed later even, so multiple times throughout lifetime of high level consumer instance, to avoid constructing new consumer when instance purpose changes. After looking more into the HLC API, thought maybe this is not needed, since there is public void subscribe(TopicPartition... partitions) which does not use consumer group management, but problem is that there is no matching explicit commit where one could pass consumer group parameter as well, to label for which consumer group should offset(s) be committed. Seems like new HLC has split personality. Maybe (at least) two APIs could have been provided instead of one with such differing behaviors. Kind regards, Stevo Slavic. On Sun, Jul 19, 2015 at 12:01 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Stevo, Hmm this is interesting, do you have any use cases in mind that need dynamic group changing? Guozhang On Fri, Jul 17, 2015 at 11:13 PM, Stevo Slavić ssla...@gmail.com wrote: Hello Apache Kafka community, In new KafkaConsumer API on trunk, it seems it's only possible to define consumer group id at construction time of KafkaConsumer, through property with group.id key. Would it make sense and be possible to support setting/changing consumer group id after construction, but before it's actually used for the first time, similar to how subscription is supported through public void subscribe(String... topics)? Maybe this can be done through additional method public void subscribe(String consumerGroupId, String... topics) which would first set provided consumer group id in coordinator and then call public void subscribe(String... topics). Kind regards, Stevo Slavic. -- -- Guozhang -- -- Guozhang
New consumer - ConsumerRecords partitions
Hello Apache Kafka community, New HLC poll returns ConsumerRecords. Do ConsumerRecords contain records for every partition that HLC is actively subscribed on for every poll request, or does it contain only records for partitions which had messages and which were retrieved in poll request? If latter, then please consider adding a method to ConsumerRecords class, public IterableTopicPartition getPartitions() that the ConsumerRecords has. I could provide a PR. Kind regards, Stevo Slavic.
New consumer - poll/seek javadoc confusing, need clarification
Hello Apache Kafka community, I find new consumer poll/seek javadoc a bit confusing. Just by reading docs I'm not sure what the outcome will be, what is expected in following scenario: - kafkaConsumer is instantiated with auto-commit off - kafkaConsumer.subscribe(someTopic) - kafkaConsumer.position is called for every TopicPartition HLC is actively subscribed on and then when doing multiple poll calls in succession (without calling commit), does seek have to be called in between poll calls to position HLC to skip what was read in previous poll, or does HLC keep that state (position after poll) in memory, so that next poll (without seek in between two poll calls) will continue from where last poll stopped? Could be it's just me not understanding this from javadoc. If not, maybe javadoc can be improved to make this (even) more obvious. Kind regards, Stevo Slavic.
New producer hangs inifitely when it looses connection to Kafka cluster
Hello Apache Kafka community, Just noticed that : - message is successfully published using new 0.8.2.1 producer - and then Kafka is stopped next attempt to publish message using same instance of new producer hangs forever, and following stacktrace gets logged repeatedly: [WARN ] [o.a.kafka.common.network.Selector] [] Error in I/O with localhost/ 127.0.0.1 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_31] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) ~[na:1.8.0_31] at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31] I expect producer to respect timeout settings even in this connection lost scenario. Is this a known bug? Is there something I can do/configure as a workaround? Kind regards, Stevo Slavic.
broker data directory
Just wanna make sure, in server.properties, the configuration log.dirs=/tmp/kafka-logs specifies the directory of where the log (data) stores, right? If I want the data to be saved elsewhere, this is the configuration I need to change, right? Thanks for answering. best,