Re: Questions about new consumer API

2015-11-18 Thread hsy...@gmail.com
That sounds like a good suggestion. I'm actually looking at the code and I
will start another thread for questions about that.

On Tue, Nov 17, 2015 at 5:42 PM, Jason Gustafson  wrote:

> Thanks for the explanation. Certainly you'd use less connections with this
> approach, but it might be worthwhile to do some performance analysis to see
> whether there is much difference in throughput (I'd be interested in seeing
> these results myself). Another approach that might be interesting would be
> to implement your own partition assignor which took into account the
> leaders of each partition. Then you could just use subscribe() and let
> Kafka manage the group for you. This is similar to how we were thinking of
> implementing consumer rack-awareness.
>
> -Jason
>
> On Tue, Nov 17, 2015 at 4:04 PM, hsy...@gmail.com 
> wrote:
>
> > By efficiency, I mean maximize throughput while minimize resources on
> both
> > broker sides and consumer sides.
> >
> > One example is if you have over 200 partitions on 10 brokers and you can
> > start 5 consumer processes to consume data, if each one is single-thread
> > and you do round-robin to distribute the load then each one will try to
> > fetch from over 40 partitions one by one through 10 connections
> > possibly(overall is 50),  but if it's smart enough to group partitions by
> > brokers, each process can have 2 separate threads(consuming from 2
> > different brokers concurrently). That seems a more optimal solution than
> > another, right?
> >
> > On Tue, Nov 17, 2015 at 2:54 PM, Jason Gustafson 
> > wrote:
> >
> > > Hi Siyuan,
> > >
> > > Your understanding about assign/subscribe is correct. We think of topic
> > > subscription as enabling automatic assignment as opposed to doing
> manual
> > > assignment through assign(). We don't currently them to be mixed.
> > >
> > > Can you elaborate on your findings with respect to using one thread per
> > > broker? In what sense was it more efficient? Doing the same thing might
> > be
> > > tricky with the new consumer, but I think you could do it using
> > > partitionsFor() to find the current partition leaders and assign() to
> set
> > > the assignment in each thread.
> > >
> > > -Jason
> > >
> > > On Tue, Nov 17, 2015 at 10:25 AM, hsy...@gmail.com 
> > > wrote:
> > >
> > > > Thanks Guozhang,
> > > >
> > > > Maybe I should give a few words about what I'm going to achieve with
> > new
> > > > API
> > > >
> > > > Currently, I'm building a new kafka connector for Apache Apex(
> > > > http://apex.incubator.apache.org/) using 0.9.0 API
> > > > Apex support dynamic partition, so in the old version, We manage all
> > the
> > > > consumer partitions in either 1:1 strategy (each consumer process
> > > consumes
> > > > only from one kafka partition) or 1:n strategy (each consumer process
> > > could
> > > > consume from multiple kafka partitions, using round-robin to
> > distribute)
> > > > And we also have separate thread to monitor topic metadata
> > change(leader
> > > > broker change, new partition added, using internal API like ZkUtil
> etc)
> > > > and do dynamic partition based on that(for example auto-reconnect to
> > new
> > > > leader broker, create new partition to consume from new kafka
> partition
> > > at
> > > > runtime).  You can see High-level consumer doesn't work(It can only
> > > balance
> > > > between existing consumers unless you manually add new one)  I'm
> > thinking
> > > > if the new consumer could be used to save some work we did before.
> > > >
> > > > I'm still confused with assign() and subscribe().  My understanding
> is
> > if
> > > > you use assign() only, the consumer becomes more like a simple
> consumer
> > > > except if the leader broker changes it automatically reconnect to the
> > new
> > > > leader broker, is it correct?   If you use subscribe() method only
> then
> > > all
> > > > the partitions will be distributed to running consumer process with
> > same
> > > "
> > > > group.id" using "partition.assignment.strategy". Is it true?
> > > >
> > > > So I assume assign() and subscribe()(and group.id
> > > > partition.assignment.strategy settings) can not be used together?
> > > >
> > > > Also in the old API we found one thread per broker is the most
> > efficient
> > > > way to consume data, for example, if one process consumes from p1,
> p2,
> > p3
> > > > and p1,p2 are sitting on one broker b1, p3 is sitting on another one
> > b2,
> > > > the best thing is create 2 threads each thread use simple consumer
> API
> > > and
> > > > only consume from one broker.  I'm thinking how do I use the new API
> to
> > > do
> > > > this.
> > > >
> > > > Thanks,
> > > > Siyuan
> > > >
> > > > On Mon, Nov 16, 2015 at 4:43 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hi Siyuan,
> > > > >
> > > > > 1) new consumer is single-threaded, it does not maintain any
> internal
> > > > > threads as the old high-level 

Q about PartitionAssignor

2015-11-18 Thread hsy...@gmail.com
Hey guys,

I saw the PartitionAssignor is not in public doc API and the package name
is internals.

Does it mean this API is not stable and could be changed even in minor
release?

And in the assign method signature, the key for the "subscription" map is
memberId, what is memberId, can I manually set the id to identify member?
I want to do some sticky assignment.


Thanks!

Best,
Siyuan


Coordinator issues with 0.9 consumer

2015-11-18 Thread Martin Skøtt
Hi,

I'm tracking the 0.9.0.0 Git tag and have a Java consumer using the new
API, but I'm seeing some strange issues. I run ZooKeeper and Kafka on my
own machine using the settings files in config/ and no authentication.
Build is done using Oracle JDK 8. I have 13 topics, each created with a
replication factor of 1.

The first time my consumer connects to the newly created topics (before
anything has been written to the topic) it emits the following errors:

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version :
0.9.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId :
fe855f98263cb7b8
[main] ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
offset commit failed.
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The
group coordinator is not available.
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Marking the coordinator 2147483647 dead.
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Attempt to join group xxx-yyy-reader failed due to obsolete coordinator
information, retrying.
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Marking the coordinator 2147483647 dead.
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Attempt to join group xxx-yyy-reader failed due to obsolete coordinator
information, retrying.

If I restart my consumer then it starts without issue.

When I start writing to the topics then my consumer reads data, but will
often log messages like these:

[main] ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
offset commit failed.
org.apache.kafka.clients.consumer.internals.SendFailedException

And occasionally these:
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Attempt to heart beat failed since member id is not valid, reset it and
try to re-join group.
[main] ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Error
ILLEGAL_GENERATION occurred while committing offsets for group
audience-indexer-reader
[main] ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
offset commit failed.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed due to group rebalance
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:497)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:438)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:646)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync(ConsumerCoordinator.java:384)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:236)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:304)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:861)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)

Does anyone have any input on what might be happening?

-- 
Martin Skøtt


Re: Is a Kafka 0.9 broker supposed to connect to itself?

2015-11-18 Thread Damian Guy
This was a brand new cluster, so 0 topics. Every broker had the same issue
and it was all communication with itself. In any case - i deployed a later
cut and it started working.

Cheers,
Damian

On 18 November 2015 at 02:15, Jun Rao  wrote:

> There is inter-broker communication. It seems that the broker got a
> request more than the default allowed size (~10MB). How many
> topic/partitions do you have on this cluster? Do you have clients running
> on the broker host?
>
> Thanks,
>
> Jun
>
>
> On Tue, Nov 17, 2015 at 4:10 AM, Damian Guy  wrote:
>
>> I would think not
>> I'm bringing up a new 0.9 cluster and i'm getting the below Exception (and
>> the same thing on all nodes) - the IP address is the IP for the host the
>> broker is running on. I think DNS is a bit stuffed on these machines and
>> maybe that is the cause, but... any ideas?
>>
>> [2015-11-17 04:01:30,248] WARN Unexpected error from /10.137.231.233;
>> closing connection (org.apache.kafka.common.network.Selector)
>> org.apache.kafka.common.network.InvalidReceiveException: Invalid receive
>> (size = 1195725856 larger than 104857600)
>> at
>>
>> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
>> at
>>
>> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>> at
>>
>> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160)
>> at
>> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141)
>> at
>> org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>> at kafka.network.Processor.run(SocketServer.scala:413)
>> at java.lang.Thread.run(Thread.java:745)
>>
>
>


Re: Coordinator issues with 0.9 consumer

2015-11-18 Thread Jason Gustafson
Hi Martin,

Thanks for reporting this problem. I think maybe we're just not doing a
very good job of handling auto-commit errors internally and they end up
spilling into user logs. I added a JIRA to address this issue:
https://issues.apache.org/jira/browse/KAFKA-2860.

-Jason

On Wed, Nov 18, 2015 at 10:16 AM, Guozhang Wang  wrote:

> Hello Martin,
>
> Could you paste the consumer config values in this thread as well? And is
> the consumer co-located with the broker?
>
> Guozhang
>
> On Wed, Nov 18, 2015 at 7:40 AM, Martin Skøtt <
> martin.sko...@falconsocial.com> wrote:
>
> > Hi,
> >
> > I'm tracking the 0.9.0.0 Git tag and have a Java consumer using the new
> > API, but I'm seeing some strange issues. I run ZooKeeper and Kafka on my
> > own machine using the settings files in config/ and no authentication.
> > Build is done using Oracle JDK 8. I have 13 topics, each created with a
> > replication factor of 1.
> >
> > The first time my consumer connects to the newly created topics (before
> > anything has been written to the topic) it emits the following errors:
> >
> > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version :
> > 0.9.0.0
> > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId
> :
> > fe855f98263cb7b8
> > [main] ERROR
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> > offset commit failed.
> > org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The
> > group coordinator is not available.
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Marking the coordinator 2147483647 dead.
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Attempt to join group xxx-yyy-reader failed due to obsolete coordinator
> > information, retrying.
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Marking the coordinator 2147483647 dead.
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Attempt to join group xxx-yyy-reader failed due to obsolete coordinator
> > information, retrying.
> >
> > If I restart my consumer then it starts without issue.
> >
> > When I start writing to the topics then my consumer reads data, but will
> > often log messages like these:
> >
> > [main] ERROR
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> > offset commit failed.
> > org.apache.kafka.clients.consumer.internals.SendFailedException
> >
> > And occasionally these:
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Attempt to heart beat failed since member id is not valid, reset it and
> > try to re-join group.
> > [main] ERROR
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Error
> > ILLEGAL_GENERATION occurred while committing offsets for group
> > audience-indexer-reader
> > [main] ERROR
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> > offset commit failed.
> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> > completed due to group rebalance
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:497)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:438)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:646)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
> > at
> >
> >
> 

Re: Coordinator issues with 0.9 consumer

2015-11-18 Thread Guozhang Wang
Hello Martin,

Could you paste the consumer config values in this thread as well? And is
the consumer co-located with the broker?

Guozhang

On Wed, Nov 18, 2015 at 7:40 AM, Martin Skøtt <
martin.sko...@falconsocial.com> wrote:

> Hi,
>
> I'm tracking the 0.9.0.0 Git tag and have a Java consumer using the new
> API, but I'm seeing some strange issues. I run ZooKeeper and Kafka on my
> own machine using the settings files in config/ and no authentication.
> Build is done using Oracle JDK 8. I have 13 topics, each created with a
> replication factor of 1.
>
> The first time my consumer connects to the newly created topics (before
> anything has been written to the topic) it emits the following errors:
>
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version :
> 0.9.0.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId :
> fe855f98263cb7b8
> [main] ERROR
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> offset commit failed.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The
> group coordinator is not available.
> [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> - Marking the coordinator 2147483647 dead.
> [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> - Attempt to join group xxx-yyy-reader failed due to obsolete coordinator
> information, retrying.
> [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> - Marking the coordinator 2147483647 dead.
> [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> - Attempt to join group xxx-yyy-reader failed due to obsolete coordinator
> information, retrying.
>
> If I restart my consumer then it starts without issue.
>
> When I start writing to the topics then my consumer reads data, but will
> often log messages like these:
>
> [main] ERROR
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> offset commit failed.
> org.apache.kafka.clients.consumer.internals.SendFailedException
>
> And occasionally these:
> [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> - Attempt to heart beat failed since member id is not valid, reset it and
> try to re-join group.
> [main] ERROR
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Error
> ILLEGAL_GENERATION occurred while committing offsets for group
> audience-indexer-reader
> [main] ERROR
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> offset commit failed.
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed due to group rebalance
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:497)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:438)
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673)
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:646)
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at
>
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync(ConsumerCoordinator.java:384)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:236)
> at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
> at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:304)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:861)
> at
>
> 

Re: Q about PartitionAssignor

2015-11-18 Thread Guozhang Wang
Currently the whole KafkaConsumer interface is tagged as "
@InterfaceStability.Unstable", meaning that the API may change in the
future. We have been very careful to make any dramatic public API changes
but still cannot guarantee this will not happen.

Member-Id is assigned by the server-side coordinator upon accepting the
consumer to join the specified group, hence it cannot be manually set. But
the memberId will not change as long as the consumer is still part of the
members of the group, so you want to do some sticky assignment you can just
remember the memberId -> partitions map on the consumer side in some
persistent storage so that even when the leader who does the assignment has
failed over other new leaders can still access the past assignment history.

Guozhang



On Wed, Nov 18, 2015 at 9:02 AM, hsy...@gmail.com  wrote:

> Hey guys,
>
> I saw the PartitionAssignor is not in public doc API and the package name
> is internals.
>
> Does it mean this API is not stable and could be changed even in minor
> release?
>
> And in the assign method signature, the key for the "subscription" map is
> memberId, what is memberId, can I manually set the id to identify member?
> I want to do some sticky assignment.
>
>
> Thanks!
>
> Best,
> Siyuan
>



-- 
-- Guozhang


Re: Unable to create a topic from console producer even though topic creation is enabled

2015-11-18 Thread Kishore N C
It turns out that "auto.create.topics.enable=true"  was actually getting
overridden to false somewhere else, and ended up causing this issue.

On Tue, Nov 3, 2015 at 8:19 PM, Artem Ervits  wrote:

> change the order of your commands
>
>  *bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test*
>
>
> On Tue, Nov 3, 2015 at 7:23 AM, Kishore N C  wrote:
>
> > Hi all,
> >
> > I have a 3-node Kafka cluster. I'm running into the following error when
> I
> > try to use the console producer to write to a topic that does *not* yet
> > exist. I have ensured that "auto.create.topics.enable=true" in
> > server.properties.
> >
> > The error:
> >
> > ubuntu@ip-XX-X-XXX-XX:/usr/local/kafka$ bin/kafka-console-producer.sh
> > --topic sampletopic --broker-list localhost:9082
> > [2015-11-03 12:00:01,011] WARN Property topic is not valid
> > (kafka.utils.VerifiableProperties)
> > Hey
> > [2015-11-03 12:00:04,304] WARN Error while fetching metadata
> > [{TopicMetadata for topic sampletopic ->
> > No partition metadata for topic sampletopic due to
> > kafka.common.UnknownTopicOrPartitionException}] for topic [sampletopic]:
> > class kafka.common.UnknownTopicOrPartitionException
> > (kafka.producer.BrokerPartitionInfo)
> > [2015-11-03 12:00:04,312] WARN Error while fetching metadata
> > [{TopicMetadata for topic sampletopic ->
> > No partition metadata for topic sampletopic due to
> > kafka.common.UnknownTopicOrPartitionException}] for topic [sampletopic]:
> > class kafka.common.UnknownTopicOrPartitionException
> > (kafka.producer.BrokerPartitionInfo)
> > [2015-11-03 12:00:04,313] ERROR Failed to collate messages by topic,
> > partition due to: Failed to fetch topic metadata for topic: sampletopic
> > (kafka.producer.async.DefaultEventHandler)
> > ...
> >
> > Once I create the topic, I am able to write to it:
> >
> > ubuntu@ip-XX-X-XXX-XX:/usr/local/kafka$ bin/kafka-topics.sh --create
> > --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic
> > sampletopic
> > Created topic "sampletopic".
> > ubuntu@ip-10-1-100-75:/usr/local/kafka$ bin/kafka-console-producer.sh
> > --topic sampletopic --broker-list localhost:9082
> > [2015-11-03 12:09:01,940] WARN Property topic is not valid
> > (kafka.utils.VerifiableProperties)
> > Hey
> > Hello
> > ^Cubuntu@ip-10-1-100-75:/usr/local/kafka$ bin/kafka-console-consumer.sh
> > --zookeeper localhost:2181 --from-beginning --topic sampletopic
> > Hey
> > Hello
> > ^CConsumed 2 messages
> >
> > Any idea what I am doing wrong or how I should troubleshoot this issue?
> >
> > Regards,
> >
> > KN.
> >
>



-- 
It is our choices that show what we truly are,
far more than our abilities.


GC Woes

2015-11-18 Thread Cory Kolbeck
Hi folks,

I've been chasing an issue for a bit now without much luck. We're seeing
occasional (1-2 times a day) pause times of 10+ seconds in a 0.8.2.0 broker
only handling ~3k messages/s. We're only seeing it on one node at a time in
a three node cluster, though which node is affected can change
occasionally. We've tried G1 and Parnew/CMS with various heap sizes and
configurations without fixing the issue.

In digging into things, I found a somewhat odd thing: YourKit's allocation
tracking shows that ~98% (by both count and size) of objects allocated are
closures around string formatting  created in the calls to trace() in the
ReplicaFetcherThread such as
https://github.com/apache/kafka/blob/0.8.2.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L52

Can anyone replicate this? I see that trace() guards printing internally
with a call to isTraceEnabled, would folks be amenable to explicitly
wrapping the calls there in isTraceEnabled given that it's a decently tight
loop?

Also, if anyone is willing to pitch ideas for GC configs or experiments yo
try, I'm all ears.

Thanks,
Cory K


Re: Avro vs Protocol buffer for Kafka message

2015-11-18 Thread Selina Tech
Hi, all:


I like to update some information about Avro message on Kafka. Avro
message include schema ID instead of Schema at each message

http://stackoverflow.com/questions/31204201/apache-kafka-with-avro-and-schema-repo-where-in-the-message-does-the-schema-id

sincerely,
Selina

On Wed, Nov 18, 2015 at 3:49 PM, Selina Tech  wrote:

> Dear All:
>
>   I need to generate some data by Samza to Kafka and then write to
> Parquet formate file.  I was asked why I choose Avro type as my Samza
> output to Kafka instead of Protocol Buffer. Since currently our data on
> Kafka are all Protocol buffer type message.
>
>   I explained that Avro encoded message has advantages such as, the
> encoded size smaller, no extra code compile, implementation easier.  fast
> to serialize/deserialize and supporting a lot language.  However some
> people believe when encoded the Avro message take as much space as Protocol
> buffer, but with schema, the size could be much bigger.
>
>   I am wondering if there are any other advantages make you choose
> Avro as your message type at Kafka? How you consider the data size for Avro
> vs Protocol buffer?
>
> Sincerely,
> Selina
>


Commit offsets only work for subscribe(), not assign()

2015-11-18 Thread hsy...@gmail.com
In the new API, the explicit commit offset method call only works for
subscribe consumer, not the assign consumer, correct?

Best,
Siyuan


Re: Q about PartitionAssignor

2015-11-18 Thread Guozhang Wang
It is used to carry data metadata that leader wants to propagate to other
members while doing the rebalance. For example, in Kafka Stream userData
contains the mapping of stream tasks to partition groups; in Kafka
Connector different connectors can also use this field to fill in
app-specific assignments.

In fact with userData, you have another way of sticky partitioning: set the
full assignment in the userData so all members can remember the whole
mapping and hence you do not need another persistent storage for leader
failover.

Guozhang


On Wed, Nov 18, 2015 at 3:25 PM, hsy...@gmail.com  wrote:

> Thanks Guozhang,  what is userData for in the Subscription?
>
> On Wed, Nov 18, 2015 at 12:05 PM, Guozhang Wang 
> wrote:
>
> > Currently the whole KafkaConsumer interface is tagged as "
> > @InterfaceStability.Unstable", meaning that the API may change in the
> > future. We have been very careful to make any dramatic public API changes
> > but still cannot guarantee this will not happen.
> >
> > Member-Id is assigned by the server-side coordinator upon accepting the
> > consumer to join the specified group, hence it cannot be manually set.
> But
> > the memberId will not change as long as the consumer is still part of the
> > members of the group, so you want to do some sticky assignment you can
> just
> > remember the memberId -> partitions map on the consumer side in some
> > persistent storage so that even when the leader who does the assignment
> has
> > failed over other new leaders can still access the past assignment
> history.
> >
> > Guozhang
> >
> >
> >
> > On Wed, Nov 18, 2015 at 9:02 AM, hsy...@gmail.com 
> > wrote:
> >
> > > Hey guys,
> > >
> > > I saw the PartitionAssignor is not in public doc API and the package
> name
> > > is internals.
> > >
> > > Does it mean this API is not stable and could be changed even in minor
> > > release?
> > >
> > > And in the assign method signature, the key for the "subscription" map
> is
> > > memberId, what is memberId, can I manually set the id to identify
> member?
> > > I want to do some sticky assignment.
> > >
> > >
> > > Thanks!
> > >
> > > Best,
> > > Siyuan
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Question on re-partition

2015-11-18 Thread Dillian Murphey
I want change the partition assignment to spread the partitions across two
machines, since machine #1 is getting full on disk space.

I have kafka manager to make this easy. Is there any downtime to
re-assigning partitions? I assume kafka builds up the new partitions and
then does a hit-less switch over.

Thanks for any info.


Re: Q about PartitionAssignor

2015-11-18 Thread hsy...@gmail.com
Thanks Guozhang,  what is userData for in the Subscription?

On Wed, Nov 18, 2015 at 12:05 PM, Guozhang Wang  wrote:

> Currently the whole KafkaConsumer interface is tagged as "
> @InterfaceStability.Unstable", meaning that the API may change in the
> future. We have been very careful to make any dramatic public API changes
> but still cannot guarantee this will not happen.
>
> Member-Id is assigned by the server-side coordinator upon accepting the
> consumer to join the specified group, hence it cannot be manually set. But
> the memberId will not change as long as the consumer is still part of the
> members of the group, so you want to do some sticky assignment you can just
> remember the memberId -> partitions map on the consumer side in some
> persistent storage so that even when the leader who does the assignment has
> failed over other new leaders can still access the past assignment history.
>
> Guozhang
>
>
>
> On Wed, Nov 18, 2015 at 9:02 AM, hsy...@gmail.com 
> wrote:
>
> > Hey guys,
> >
> > I saw the PartitionAssignor is not in public doc API and the package name
> > is internals.
> >
> > Does it mean this API is not stable and could be changed even in minor
> > release?
> >
> > And in the assign method signature, the key for the "subscription" map is
> > memberId, what is memberId, can I manually set the id to identify member?
> > I want to do some sticky assignment.
> >
> >
> > Thanks!
> >
> > Best,
> > Siyuan
> >
>
>
>
> --
> -- Guozhang
>


Re: Coordinator issues with 0.9 consumer

2015-11-18 Thread Martin Skøtt
Hi Guozhang,

The consumer, broker, and zookeeper are all on the same machine - just
testing out Kafka at the moment :)

Here are the configuration values I set:
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "xxx-yyy-reader");
properties.setProperty("session.timeout.ms", "3");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"xxxyyy.kafka.JsonDeserializer");
properties.setProperty("client.id", "xxx-yyy-consumer");

I guess they are fairly standard.

Martin

On 18 November 2015 at 19:16, Guozhang Wang  wrote:

> Hello Martin,
>
> Could you paste the consumer config values in this thread as well? And is
> the consumer co-located with the broker?
>
> Guozhang
>
> On Wed, Nov 18, 2015 at 7:40 AM, Martin Skøtt <
> martin.sko...@falconsocial.com> wrote:
>
> > Hi,
> >
> > I'm tracking the 0.9.0.0 Git tag and have a Java consumer using the new
> > API, but I'm seeing some strange issues. I run ZooKeeper and Kafka on my
> > own machine using the settings files in config/ and no authentication.
> > Build is done using Oracle JDK 8. I have 13 topics, each created with a
> > replication factor of 1.
> >
> > The first time my consumer connects to the newly created topics (before
> > anything has been written to the topic) it emits the following errors:
> >
> > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version :
> > 0.9.0.0
> > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId
> :
> > fe855f98263cb7b8
> > [main] ERROR
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> > offset commit failed.
> > org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The
> > group coordinator is not available.
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Marking the coordinator 2147483647 dead.
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Attempt to join group xxx-yyy-reader failed due to obsolete coordinator
> > information, retrying.
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Marking the coordinator 2147483647 dead.
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Attempt to join group xxx-yyy-reader failed due to obsolete coordinator
> > information, retrying.
> >
> > If I restart my consumer then it starts without issue.
> >
> > When I start writing to the topics then my consumer reads data, but will
> > often log messages like these:
> >
> > [main] ERROR
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> > offset commit failed.
> > org.apache.kafka.clients.consumer.internals.SendFailedException
> >
> > And occasionally these:
> > [main] INFO
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> > - Attempt to heart beat failed since member id is not valid, reset it and
> > try to re-join group.
> > [main] ERROR
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Error
> > ILLEGAL_GENERATION occurred while committing offsets for group
> > audience-indexer-reader
> > [main] ERROR
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Auto
> > offset commit failed.
> > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> > completed due to group rebalance
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:497)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:438)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:673)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:646)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
> > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
> > at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
> > at
> >
> >
> 

Avro vs Protocol buffer for Kafka message

2015-11-18 Thread Selina Tech
Dear All:

  I need to generate some data by Samza to Kafka and then write to
Parquet formate file.  I was asked why I choose Avro type as my Samza
output to Kafka instead of Protocol Buffer. Since currently our data on
Kafka are all Protocol buffer type message.

  I explained that Avro encoded message has advantages such as, the
encoded size smaller, no extra code compile, implementation easier.  fast
to serialize/deserialize and supporting a lot language.  However some
people believe when encoded the Avro message take as much space as Protocol
buffer, but with schema, the size could be much bigger.

  I am wondering if there are any other advantages make you choose Avro
as your message type at Kafka? How you consider the data size for Avro vs
Protocol buffer?

Sincerely,
Selina


Re: Setting the consumer's offset

2015-11-18 Thread Yonghui Zhao
Hi,

How about this feature? thanks





*We do plan to allow the high level consumer to specify a
starting offset inthe future when we revisit the consumer design. Some of
the details aredescribed
inhttps://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
*

2013-04-02 22:45 GMT+08:00 Jun Rao :

> Currently, we have tools like ImportOffset that can update the starting
> offset of a consumer in ZK. However, one has to stop all consumers first.
>
> We do plan to allow the high level consumer to specify a starting offset in
> the future when we revisit the consumer design. Some of the details are
> described in
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
>
> Thanks,
>
> Jun
>
> On Tue, Apr 2, 2013 at 6:56 AM, James Englert  wrote:
>
> > I wrote some code
> > here<
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201303.mbox/%3CCAPD5FitPZkn7Uu+hRY70p8NP=spTEm8zaBOEqKcTiVFB=oo...@mail.gmail.com%3E
> > >that
> > resets the offset.  You just need to be sure to set this as the
> > consumer(s) is starting up.   I'm not certain if it is the "right way" to
> > do things.
> >
> >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-users/201303.mbox/%3CCAPD5FitPZkn7Uu+hRY70p8NP=spTEm8zaBOEqKcTiVFB=oo...@mail.gmail.com%3E
> >
> >
> > On Tue, Apr 2, 2013 at 12:46 AM, Jun Rao  wrote:
> >
> > > Sean,
> > >
> > > A broker can have multiple topics, each with multiple partitions. Each
> > > partition can be consumed by multiple consumers.
> > >
> > > Our high level consumer API doesn't allow you to specify a starting
> > offset.
> > > SimpleConsumer does. If you use SimpleConsumer, you are responsible for
> > > managing the consumption of all partitions. Consumer group is only used
> > in
> > > the high level consumer.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Apr 1, 2013 at 7:10 PM, Sean Grayson  wrote:
> > >
> > > > Hello,
> > > >
> > > > Hopefully I'm sending this question to the right place. I'm currently
> > > > trying to set up a consumer that will allow me to specify the offset,
> > > > partition, and consumer group ID all at the same time. This obviously
> > > > causes a dilemma since neither the low-level or high-level consumer
> > APIs
> > > > seem to support all three. So I have a couple questions:
> > > >
> > > > 1) Am I correct in my understanding that there is a 1-to-1
> relationship
> > > > between brokers and partitions? Are multiple consumers per partition
> > > > supported? If not, I imagine I could just start up a simple consumer
> > per
> > > > partition and that would give me what I want. If so, then do simple
> > > > consumers actually support consumer group IDs without my knowledge?
> > > >
> > > > 2) Is it possible to specify a custom offset for the high-level
> > consumer
> > > > API other than from the tail (autooffset.reset = "smallest") or the
> > head
> > > > (autooffset.reset = "largest")? I know that the offset for each
> > consumer,
> > > > topic, and partition relationship is stored in a zookeeper ephemeral
> > > node.
> > > > Would connecting to zookeeper and changing all these offsets to the
> > > desired
> > > > values give me what I want? I have tried this with just one consumer
> > > using
> > > > the high-level api with one partition and one zookeeper host and it
> > > seemed
> > > > to work alright. But I'm wondering if changing these offsets would
> work
> > > > correctly and efficiently with multiple partitions and consumers with
> > the
> > > > same ID, as well as multiple zookeeper hosts.
> > > >
> > > > Thanks,
> > > > Sean
> > > >
> > >
> >
> >
> >
> > --
> > Jim Englert
> > Gilt Groupe
> > 2 Park Ave South, 5th Floor
> > New York, NY 10011
> > M: 847-707-2942
> > Please accept my invitation to join Gilt:
> > http://www.giltgroupe.com/invite/jenglert
> >
>