Re: New consumer - ConsumerRecords partitions

2015-07-21 Thread Jason Gustafson
Hey Stevo,

I think ConsumerRecords only contains the partitions which had messages.
Would you mind creating a jira for the feature request? You're welcome to
submit a patch as well.

-Jason

On Tue, Jul 21, 2015 at 2:27 AM, Stevo Slavić ssla...@gmail.com wrote:

 Hello Apache Kafka community,

 New HLC poll returns ConsumerRecords.

 Do ConsumerRecords contain records for every partition that HLC is actively
 subscribed on for every poll request, or does it contain only records for
 partitions which had messages and which were retrieved in poll request?

 If latter, then please consider adding a method to ConsumerRecords class,
 public IterableTopicPartition getPartitions() that the ConsumerRecords
 has. I could provide a PR.

 Kind regards,
 Stevo Slavic.



Re: broker data directory

2015-07-21 Thread Yuheng Du
Thank you, Nicolas!

On Tue, Jul 21, 2015 at 10:46 AM, Nicolas Phung nicolas.ph...@gmail.com
wrote:

 Yes indeed.

 # A comma seperated list of directories under which to store log files
 log.dirs=/var/lib/kafka

 You can put several disk/partitions too.

 Regards,

 On Tue, Jul 21, 2015 at 4:37 PM, Yuheng Du yuheng.du.h...@gmail.com
 wrote:

  Just wanna make sure, in server.properties, the configuration
  log.dirs=/tmp/kafka-logs
 
  specifies the directory of where the log (data) stores, right?
 
  If I want the data to be saved elsewhere, this is the configuration I
 need
  to change, right?
 
  Thanks for answering.
 
  best,
 



Re: New producer hangs inifitely when it looses connection to Kafka cluster

2015-07-21 Thread Ewen Cheslack-Postava
This is a known issue. There are a few relevant JIRAs and a KIP:

https://issues.apache.org/jira/browse/KAFKA-1788
https://issues.apache.org/jira/browse/KAFKA-2120
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient

-Ewen

On Tue, Jul 21, 2015 at 7:05 AM, Stevo Slavić ssla...@gmail.com wrote:

 Hello Apache Kafka community,

 Just noticed that :
 - message is successfully published using new 0.8.2.1 producer
 - and then Kafka is stopped

 next attempt to publish message using same instance of new producer hangs
 forever, and following stacktrace gets logged repeatedly:

 [WARN ] [o.a.kafka.common.network.Selector] [] Error in I/O with localhost/
 127.0.0.1
 java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_31]
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
 ~[na:1.8.0_31]
 at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
 ~[kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
 [kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
 [kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
 [kafka-clients-0.8.2.1.jar:na]
 at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]


 I expect producer to respect timeout settings even in this connection lost
 scenario.

 Is this a known bug? Is there something I can do/configure as a workaround?

 Kind regards,
 Stevo Slavic.




-- 
Thanks,
Ewen


Re: New consumer - poll/seek javadoc confusing, need clarification

2015-07-21 Thread Ewen Cheslack-Postava
On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić ssla...@gmail.com wrote:

 Hello Apache Kafka community,

 I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
 I'm not sure what the outcome will be, what is expected in following
 scenario:

 - kafkaConsumer is instantiated with auto-commit off
 - kafkaConsumer.subscribe(someTopic)
 - kafkaConsumer.position is called for every TopicPartition HLC is actively
 subscribed on

 and then when doing multiple poll calls in succession (without calling
 commit), does seek have to be called in between poll calls to position HLC
 to skip what was read in previous poll, or does HLC keep that state
 (position after poll) in memory, so that next poll (without seek in between
 two poll calls) will continue from where last poll stopped?


The position is tracked in-memory within the consumer, so as long as there
isn't a consumer rebalance, consumption will just proceed with subsequent
messages (i.e. the behavior I think most people would find intuitive).
However, if a rebalance occurs (another consumer instance joins the group
or some leave), then a partition may be assigned to an different consumer
instance that has no idea about the current position and will restart based
on the offset reset setting (because attempting to fetch the committed
offset will fail since no offsets have been committed).

-Ewen


 Could be it's just me not understanding this from javadoc. If not, maybe
 javadoc can be improved to make this (even) more obvious.

 Kind regards,
 Stevo Slavic.




-- 
Thanks,
Ewen


Re: Retrieving lost messages produced while the consumer was down.

2015-07-21 Thread Ewen Cheslack-Postava
Since you mentioned consumer groups, I'm assuming you're using the high
level consumer? Do you have auto.commit.enable set to true?

It sounds like when you start up you are always getting the
auto.offset.reset behavior, which indicates you don't have any offsets
committed. By default, that behavior is to consume from the latest offset
(which would only get messages produced after the consumer starts).

To get the behavior you're looking for, you should make sure to commit
offsets when you're shutting down your consumer so it will resume where you
left off the next time you start it. Unless you are using the
SimpleConsumer, you shouldn't need to explicitly make any requests yourself.


On Tue, Jul 21, 2015 at 2:24 PM, Tomas Niño Kehoe tomasninoke...@gmail.com
wrote:

 Hi,

 We've been using Kafka for a couple of months, and now we're trying to to
 write a Simple application using the ConsumerGroup to fully understand
 Kafka.

 Having the producer continually writing data, our consumer occasionally
 needs to be restarted. However, once the program is brought back up,
 messages which we're produced during that period of time are not being
 read. Instead, the consumer (this is a single consumer inside a Consume
 group) will read the messages produced after it was brought back up.  Its
 configuration doesn't change at all.

 For example using the simple consumer/producer apps:

 Produced 1, 2, 3, 4, 5
 Consumed 1, 2, 3, 4, 5

 [Stop the consumer]
 Produce 20, 21, 22, 23

 When the consumer is brought back up, I'd like to get 20, 21, 22, 23, but I
 will only get either new messages, or all the messages using
 (--from-beginning).

 Is there a way of achieving this programatically, without for example
 writing an offset into the zookeeper node? Is the OffsetCommitRequest the
 way to go?

 Thanks in advance


 Tomás




-- 
Thanks,
Ewen


Retrieving lost messages produced while the consumer was down.

2015-07-21 Thread Tomas Niño Kehoe
Hi,

We've been using Kafka for a couple of months, and now we're trying to to
write a Simple application using the ConsumerGroup to fully understand
Kafka.

Having the producer continually writing data, our consumer occasionally
needs to be restarted. However, once the program is brought back up,
messages which we're produced during that period of time are not being
read. Instead, the consumer (this is a single consumer inside a Consume
group) will read the messages produced after it was brought back up.  Its
configuration doesn't change at all.

For example using the simple consumer/producer apps:

Produced 1, 2, 3, 4, 5
Consumed 1, 2, 3, 4, 5

[Stop the consumer]
Produce 20, 21, 22, 23

When the consumer is brought back up, I'd like to get 20, 21, 22, 23, but I
will only get either new messages, or all the messages using
(--from-beginning).

Is there a way of achieving this programatically, without for example
writing an offset into the zookeeper node? Is the OffsetCommitRequest the
way to go?

Thanks in advance


Tomás


Implementing a custom partitioner

2015-07-21 Thread JIEFU GONG
Hi all,

If I wanted to write my own partitioner, all I would need to do is write a
class that extends Partitioner and override the partition function,
correct? I am currently doing so, at least, with a class in the package
'services', yet when I use:

properties.put(partitioner.class, services.myPartitioner);

and instantiate my producer, this doesn't work properly. I'm using a simple
switch statement, so I am led to believe that I have not improperly written
my partitioner. After attempting to debug the issue, I
notice that the constructor I'm entering when attempting to instantiate the
producer has the line:

this.partitioner = new Partitioner();

which more or less ignores my input. Any ideas? Help is appreciated!




-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427


Re: Implementing a custom partitioner

2015-07-21 Thread Sriharsha Chintalapani
Hi,
     Are you using the latest trunk for Producer API?.  Did you implement the 
interface here 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
-- 
Harsha


On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote:

Hi all,  

If I wanted to write my own partitioner, all I would need to do is write a  
class that extends Partitioner and override the partition function,  
correct? I am currently doing so, at least, with a class in the package  
'services', yet when I use:  

properties.put(partitioner.class, services.myPartitioner);  

and instantiate my producer, this doesn't work properly. I'm using a simple  
switch statement, so I am led to believe that I have not improperly written  
my partitioner. After attempting to debug the issue, I  
notice that the constructor I'm entering when attempting to instantiate the  
producer has the line:  

this.partitioner = new Partitioner();  

which more or less ignores my input. Any ideas? Help is appreciated!  




--  

Jiefu Gong  
University of California, Berkeley | Class of 2017  
B.A Computer Science | College of Letters and Sciences  

jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427  


Re: Implementing a custom partitioner

2015-07-21 Thread JIEFU GONG
Sriharsha, thanks for your response. I'm using version 0.8.2, and I am
implementing kafka.producer.Partitioner.

I noticed that in the latest trunk the line I specified above is replaced
with:
this.partitioner = config.getConfiguredInstance(ProducerConfig.
PARTITIONER_CLASS_CONFIG, Partitioner.class);

does this mean I cannot use my own partitioner in v 0.8.2?


On Tue, Jul 21, 2015 at 2:48 PM, Sriharsha Chintalapani ka...@harsha.io
wrote:

 Hi,
  Are you using the latest trunk for Producer API?.  Did you implement
 the interface here
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
 --
 Harsha


 On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote:

 Hi all,

 If I wanted to write my own partitioner, all I would need to do is write a
 class that extends Partitioner and override the partition function,
 correct? I am currently doing so, at least, with a class in the package
 'services', yet when I use:

 properties.put(partitioner.class, services.myPartitioner);

 and instantiate my producer, this doesn't work properly. I'm using a
 simple
 switch statement, so I am led to believe that I have not improperly
 written
 my partitioner. After attempting to debug the issue, I
 notice that the constructor I'm entering when attempting to instantiate
 the
 producer has the line:

 this.partitioner = new Partitioner();

 which more or less ignores my input. Any ideas? Help is appreciated!




 --

 Jiefu Gong
 University of California, Berkeley | Class of 2017
 B.A Computer Science | College of Letters and Sciences

 jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427




-- 

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427


Re: Implementing a custom partitioner

2015-07-21 Thread Sriharsha Chintalapani
If you are using the new producer api from kafka 0.8.2 there is no pluggable 
partitioner in it for this you need to use the latest trunk. But in 0.8.2 if 
you are using old producer code you can implement a pluggable partitioner 
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/ProducerConfig.scala#L69
by implementing this interface
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Partitioner.scala

and its get created here 
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Producer.scala#L61

Thanks,
Harsha


On July 21, 2015 at 2:54:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote:

Sriharsha, thanks for your response. I'm using version 0.8.2, and I am 
implementing kafka.producer.Partitioner. 

I noticed that in the latest trunk the line I specified above is replaced with:

this.partitioner  
=  
config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,  
Partitioner.class);

does this mean I cannot use my own partitioner in v 0.8.2?


On Tue, Jul 21, 2015 at 2:48 PM, Sriharsha Chintalapani ka...@harsha.io wrote:
Hi,
     Are you using the latest trunk for Producer API?.  Did you implement the 
interface here 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
-- 
Harsha


On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote:

Hi all,

If I wanted to write my own partitioner, all I would need to do is write a
class that extends Partitioner and override the partition function,
correct? I am currently doing so, at least, with a class in the package
'services', yet when I use:

properties.put(partitioner.class, services.myPartitioner);

and instantiate my producer, this doesn't work properly. I'm using a simple
switch statement, so I am led to believe that I have not improperly written
my partitioner. After attempting to debug the issue, I
notice that the constructor I'm entering when attempting to instantiate the
producer has the line:

this.partitioner = new Partitioner();

which more or less ignores my input. Any ideas? Help is appreciated!




--

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427



--
Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences
jg...@berkeley.edu | (925) 400-3427


ZK chroot path would be automatically created since Kafka 0.8.2.0?

2015-07-21 Thread yewton
Hi,

The document about zookeeper.connect on Broker Configs says that
Note that you must create this path yourself prior to starting the broker,
but it seems the broker creates the path automatically on start up
(maybe related issue: https://issues.apache.org/jira/browse/KAFKA-404 ).

So the document is not just up to date?

Thanks,
Yuto Sasaki


Re: New consumer - poll/seek javadoc confusing, need clarification

2015-07-21 Thread James Cheng

 On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava e...@confluent.io wrote:
 
 On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić ssla...@gmail.com wrote:
 
 Hello Apache Kafka community,
 
 I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
 I'm not sure what the outcome will be, what is expected in following
 scenario:
 
 - kafkaConsumer is instantiated with auto-commit off
 - kafkaConsumer.subscribe(someTopic)
 - kafkaConsumer.position is called for every TopicPartition HLC is actively
 subscribed on
 
 and then when doing multiple poll calls in succession (without calling
 commit), does seek have to be called in between poll calls to position HLC
 to skip what was read in previous poll, or does HLC keep that state
 (position after poll) in memory, so that next poll (without seek in between
 two poll calls) will continue from where last poll stopped?
 
 
 The position is tracked in-memory within the consumer, so as long as there
 isn't a consumer rebalance, consumption will just proceed with subsequent
 messages (i.e. the behavior I think most people would find intuitive).
 However, if a rebalance occurs (another consumer instance joins the group
 or some leave), then a partition may be assigned to an different consumer
 instance that has no idea about the current position and will restart based
 on the offset reset setting (because attempting to fetch the committed
 offset will fail since no offsets have been committed).
 

Ewen,

What happens if there is a broker failure and a new broker becomes the 
partition leader? Does the high level consumer start listening to the new 
partition leader at the in-memory position, or does it restart based on saved 
offsets?

Thanks,
-James

 -Ewen
 
 
 Could be it's just me not understanding this from javadoc. If not, maybe
 javadoc can be improved to make this (even) more obvious.
 
 Kind regards,
 Stevo Slavic.
 
 
 
 
 -- 
 Thanks,
 Ewen



Re: Issue with corrupt message in Topic

2015-07-21 Thread Adam Dubiel
Hi Nicolas,

From my experience there are only two ways out:
1) wait for retention time to pass, so data gets deleted (this is usually
unacceptable)
2) trace offset of corrupt message on all affected subscriptions and skip
this message by overwriting it (offset+1)

Problem is, that when encountering corrupt message, high level consumer
iterator goes into invalid state and closes. There is no way to skip this
message or recover from it without skipping offsets. You might try to use
SimpleConsumer though. Maybe someone knows other ways to deal with this
problem, but we haven't found any.

BR,
Adam

2015-07-21 9:38 GMT+02:00 Nicolas Phung nicolas.ph...@gmail.com:

 Hello,

 I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message
 from my Kafka topic with Spark Streaming, I've got the following error :

 kafka.message.InvalidMessageException: Message is corrupt (stored crc =
 3561357254, computed crc = 171652633)
 at kafka.message.Message.ensureValid(Message.scala:166)
 at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
 scala:102)
 at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
 scala:33)
 at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla
 te.scala:66)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
 at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
 geHandler.run(ReliableKafkaReceiver.scala:265)
 at java.util.concurrent.Executors$RunnableAdapter.call(
 Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
 Executor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
 lExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
 java.lang.IllegalStateException: Iterator is in failed state
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
 at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
 geHandler.run(ReliableKafkaReceiver.scala:265)
 at java.util.concurrent.Executors$RunnableAdapter.call(
 Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
 Executor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
 lExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 From my understanding, there's some corrupt message in my topic. I'm using
 the new Producer API to send message compress with Snappy. I found an old
 topic talking about it but with no further step to resolve the issue. Do
 you have any informations regarding this ?

 Is it possible in Kafka to somehow reread the topic and drop corrupt
 message ?

 Regards,
 Nicolas PHUNG



Issue with corrupt message in Topic

2015-07-21 Thread Nicolas Phung
Hello,

I'm using Confluent Kafka (0.8.2.0-cp). When I'm trying to process message
from my Kafka topic with Spark Streaming, I've got the following error :

kafka.message.InvalidMessageException: Message is corrupt (stored crc =
3561357254, computed crc = 171652633)
at kafka.message.Message.ensureValid(Message.scala:166)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
scala:102)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.
scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTempla
te.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
geHandler.run(ReliableKafkaReceiver.scala:265)
at java.util.concurrent.Executors$RunnableAdapter.call(
Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641
15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message
java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$Messa
geHandler.run(ReliableKafkaReceiver.scala:265)
at java.util.concurrent.Executors$RunnableAdapter.call(
Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

From my understanding, there's some corrupt message in my topic. I'm using
the new Producer API to send message compress with Snappy. I found an old
topic talking about it but with no further step to resolve the issue. Do
you have any informations regarding this ?

Is it possible in Kafka to somehow reread the topic and drop corrupt
message ?

Regards,
Nicolas PHUNG


Re: New consumer - consumer group init

2015-07-21 Thread Stevo Slavić
Thanks all for fast feedback!

It's great news if that aspect is improved as well in new HLC. I will test
and get back with any related findings.

Kind regards,
Stevo Slavic.

On Mon, Jul 20, 2015 at 9:57 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Stevo,

 I am still not very clear on your point yet, I guess I was trying to figure
 out under which circumstances would users prefer to re-set the group id at
 an existing consumer rather than creating a new instance. As Jason
 mentioned, since the new consumer is single threaded it should usually be
 cheap to construct.

 Guozhang

 On Mon, Jul 20, 2015 at 11:06 AM, Jason Gustafson ja...@confluent.io
 wrote:

  Hey Stevo,
 
  The new consumer doesn't have any threads of its own, so I think
  construction should be fairly cheap.
 
  -Jason
 
  On Sun, Jul 19, 2015 at 2:13 PM, Stevo Slavić ssla...@gmail.com wrote:
 
   Hello Guozhang,
  
   It would be enough if consumer group could, besides at construction
 time,
   be set once only after construction. Have to retest, but high level
   consumer in 0.8.1 used to be very heavy weight object (lots of threads
   started, and it would block and take time to construct it). It's
   understandable, considering all of the high level features it has, and
   since it's supposed to be long living object. What would improve with
  this
   change is that construction penalty could be paid upfront, while topic
   subscription and joining consumer group ensemble could be done on first
   use, so that first use does not have to suffer from both init and
   subscription penalties.
  
   It would be nice also if consumer group just as subscription could be
   changed later even, so multiple times throughout lifetime of high level
   consumer instance, to avoid constructing new consumer when instance
  purpose
   changes.
  
   After looking more into the HLC API, thought maybe this is not needed,
   since there is public void subscribe(TopicPartition... partitions)
  which
   does not use consumer group management, but problem is that there is no
   matching explicit commit where one could pass consumer group parameter
 as
   well, to label for which consumer group should offset(s) be committed.
  
   Seems like new HLC has split personality. Maybe (at least) two APIs
 could
   have been provided instead of one with such differing behaviors.
  
   Kind regards,
   Stevo Slavic.
  
   On Sun, Jul 19, 2015 at 12:01 AM, Guozhang Wang wangg...@gmail.com
   wrote:
  
Hi Stevo,
   
Hmm this is interesting, do you have any use cases in mind that need
dynamic group changing?
   
Guozhang
   
On Fri, Jul 17, 2015 at 11:13 PM, Stevo Slavić ssla...@gmail.com
   wrote:
   
 Hello Apache Kafka community,

 In new KafkaConsumer API on trunk, it seems it's only possible to
   define
 consumer group id at construction time of KafkaConsumer, through
   property
 with group.id key.

 Would it make sense and be possible to support setting/changing
   consumer
 group id after construction, but before it's actually used for the
   first
 time, similar to how subscription is supported through public void
 subscribe(String... topics)?

 Maybe this can be done through additional method public void
 subscribe(String consumerGroupId, String... topics) which would
  first
set
 provided consumer group id in coordinator and then call public
 void
 subscribe(String... topics).

 Kind regards,
 Stevo Slavic.

   
   
   
--
-- Guozhang
   
  
 



 --
 -- Guozhang



New consumer - ConsumerRecords partitions

2015-07-21 Thread Stevo Slavić
Hello Apache Kafka community,

New HLC poll returns ConsumerRecords.

Do ConsumerRecords contain records for every partition that HLC is actively
subscribed on for every poll request, or does it contain only records for
partitions which had messages and which were retrieved in poll request?

If latter, then please consider adding a method to ConsumerRecords class,
public IterableTopicPartition getPartitions() that the ConsumerRecords
has. I could provide a PR.

Kind regards,
Stevo Slavic.


New consumer - poll/seek javadoc confusing, need clarification

2015-07-21 Thread Stevo Slavić
Hello Apache Kafka community,

I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
I'm not sure what the outcome will be, what is expected in following
scenario:

- kafkaConsumer is instantiated with auto-commit off
- kafkaConsumer.subscribe(someTopic)
- kafkaConsumer.position is called for every TopicPartition HLC is actively
subscribed on

and then when doing multiple poll calls in succession (without calling
commit), does seek have to be called in between poll calls to position HLC
to skip what was read in previous poll, or does HLC keep that state
(position after poll) in memory, so that next poll (without seek in between
two poll calls) will continue from where last poll stopped?

Could be it's just me not understanding this from javadoc. If not, maybe
javadoc can be improved to make this (even) more obvious.

Kind regards,
Stevo Slavic.


New producer hangs inifitely when it looses connection to Kafka cluster

2015-07-21 Thread Stevo Slavić
Hello Apache Kafka community,

Just noticed that :
- message is successfully published using new 0.8.2.1 producer
- and then Kafka is stopped

next attempt to publish message using same instance of new producer hangs
forever, and following stacktrace gets logged repeatedly:

[WARN ] [o.a.kafka.common.network.Selector] [] Error in I/O with localhost/
127.0.0.1
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_31]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
~[na:1.8.0_31]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
[kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]


I expect producer to respect timeout settings even in this connection lost
scenario.

Is this a known bug? Is there something I can do/configure as a workaround?

Kind regards,
Stevo Slavic.


broker data directory

2015-07-21 Thread Yuheng Du
Just wanna make sure, in server.properties, the configuration
log.dirs=/tmp/kafka-logs

specifies the directory of where the log (data) stores, right?

If I want the data to be saved elsewhere, this is the configuration I need
to change, right?

Thanks for answering.

best,