RE: Regarding The Kafka Offset Management Issue In Direct Stream Approach.

2015-11-25 Thread Dave Ariens
Charan,

You may find this Gist useful for storing/retrieving offsets for Kafka topics:

https://gist.github.com/ariens/e6a39bc3dbeb11467e53


From: Cody Koeninger [c...@koeninger.org]
Sent: Friday, November 06, 2015 10:10 AM
To: users@kafka.apache.org
Subject: Re: Regarding The Kafka Offset Management Issue In Direct Stream 
Approach.

Questions about Spark-kafka integration are better directed to the Spark
user mailing list.

I'm not 100% sure what you're asking.  The spark createDirectStream api
will not store any offsets internally, unless you enable checkpointing.



On Sun, Nov 1, 2015 at 10:26 PM, Charan Ganga Phani Adabala <
char...@eiqnetworks.com> wrote:

> Hi All,
>
> We are working in Apache spark with Kafka integration, in this use case we
> are using DirectStream approach. we want to avoid the data loss in this
> approach for actually we take offsets and saving that offset into MongoDB.
>
> We want some clarification is Spark stores any offsets internally, let us
> explain some example :
>
> For the first rdd batch we get 0 to 5 offsets of events to be processed,
> but unexpectedly the application is crashed, then we started aging the
> application, then this job fetches again from 0 to 5 events or where the
> event stopped in previous job.
>
> We are not committing any offsets in the above process, because we have to
> commit offsets manually in DirectStream approach. Is that new job fetches
> events form 0th position.
>
>
>
>
>
> Thanks & Regards,
>
> *Ganga Phani Charan Adabala | Software Engineer*
>
> o:  +91-40-23116680 | c:  +91-9491418099
>
> e:  char...@eiqnetworks.com
>
> [image: cid:image001.jpg@01CF60B1.87C0C870]
> *EiQ Networks®, Inc.* |  www.eiqnetworks.com
>
> *www.socvue.com * | www.eiqfederal.com
>
>
>
> [image: Blog] Blog
>    [image: Twitter]
>  Twitter
>    [image: LinkedIn]
>  LinkedIn
>    [image: Facebook]
>  Facebook
> 
>
>
>
> *"This email is intended only for the use of the individual or entity
> named above and may contain information that is confidential and
> privileged. If you are not the intended recipient, you are hereby notified
> that any dissemination, distribution or copying of the email is strictly
> prohibited. If you have received this email in error, please destroy
> the original message."*
>
>
>
>
>


[ANNOUNCE] CFP open for ApacheCon North America 2016

2015-11-25 Thread Rich Bowen
Community growth starts by talking with those interested in your
project. ApacheCon North America is coming, are you?

We are delighted to announce that the Call For Presentations (CFP) is
now open for ApacheCon North America. You can submit your proposed
sessions at
http://events.linuxfoundation.org/events/apache-big-data-north-america/program/cfp
for big data talks and
http://events.linuxfoundation.org/events/apachecon-north-america/program/cfp
for all other topics.

ApacheCon North America will be held in Vancouver, Canada, May 9-13th
2016. ApacheCon has been running every year since 2000, and is the place
to build your project communities.

While we will consider individual talks we prefer to see related
sessions that are likely to draw users and community members. When
submitting your talk work with your project community and with related
communities to come up with a full program that will walk attendees
through the basics and on into mastery of your project in example use
cases. Content that introduces what's new in your latest release is also
of particular interest, especially when it builds upon existing well
know application models. The goal should be to showcase your project in
ways that will attract participants and encourage engagement in your
community, Please remember to involve your whole project community (user
and dev lists) when building content. This is your chance to create a
project specific event within the broader ApacheCon conference.

Content at ApacheCon North America will be cross-promoted as
mini-conferences, such as ApacheCon Big Data, and ApacheCon Mobile, so
be sure to indicate which larger category your proposed sessions fit into.

Finally, please plan to attend ApacheCon, even if you're not proposing a
talk. The biggest value of the event is community building, and we count
on you to make it a place where your project community is likely to
congregate, not just for the technical content in sessions, but for
hackathons, project summits, and good old fashioned face-to-face networking.

-- 
rbo...@apache.org
http://apache.org/


Re: New kafka-consumer-groups.sh not showing inactive consumer gorup

2015-11-25 Thread Guozhang Wang
Hi Tao,

Yes this is the intended behavior, since it only queries the consumer
coordinator's in-memory cache about the consumer group metadata, which only
contains active consumer groups, if a group has no live members any more it
is treated as dead and removed from cache.

To get committed offsets from all groups that has not expired you can use
console consumer to directly fetch the offset topic from Kafka:

bin/kafka-console-consumer.sh --consumer.config config/consumer.properties
--from-beginning --topic __consumer_offsets --zookeeper localhost:2181
--formatter
"kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

Guozhang

On Wed, Nov 25, 2015 at 4:53 AM, tao xiao  wrote:

> Hi team,
>
> In 0.9.0.0 a new tool kafka-consumer-groups.sh is introduced to show offset
> and lag for a particular consumer group prefer over the old
> one kafka-consumer-offset-checker.sh. However I found that the new tool
> only works for consumer groups that are currently active but not for
> consumer groups that ever connected to the broker.  I wonder if this is
> intended behavior. If yes how do query non-active consumer group committed
> offset besides using kafka-consumer-offset-checker.sh
>



-- 
-- Guozhang


mvn repository for kafka 0.9 jars

2015-11-25 Thread MrWanny
Hi,

is there any plan to publish the kafka jars to mvn repository?
http://mvnrepository.com/artifact/org.apache.kafka

current the latest available is 0.8.2.2

Thanks

  Wanny


upgrade to 0.9 and auto generated broker id

2015-11-25 Thread Evgeniy Shishkin
Hello,

i have a question regarding upgrade to 0.9

Is it recommended to keep broker.id in config after upgrade?
Will be there any dataloss or stuck replication if we remove broker.id and 
restart broker?

Thanks.

mvn repository for kafka 0.9 jars

2015-11-25 Thread MrWanny
Nice, switching to maven central.

Thanks a lot

Wanny

From: Jun Rao [j...@confluent.io]
Sent: Wednesday, November 25, 2015 8:07 AM
To: users@kafka.apache.org
Subject: Re: mvn repository for kafka 0.9 jars

Wanny,

The 0.9.0.0 Kafka jars are already in maven central (
http://central.maven.org/maven2/org/apache/kafka/kafka_2.10/0.9.0.0/).

Thanks,

Jun

On Wed, Nov 25, 2015 at 8:02 AM, MrWanny  wrote:

> Hi,
>
> is there any plan to publish the kafka jars to mvn repository?
> http://mvnrepository.com/artifact/org.apache.kafka
>
> current the latest available is 0.8.2.2
>
> Thanks
>
>   Wanny
>

  Wanny


Re: 0.9.0.0[error]

2015-11-25 Thread Jun Rao
Fredo,

Thanks for reporting this. Are you starting a brand new 0.9.0.0 cluster?
Are there steps that one can follow to reproduce this issue easily?

Jun

On Tue, Nov 24, 2015 at 10:52 PM, Fredo Lee  wrote:

> The content below is the report for kafka
>
> when i try to fetch coordinator broker, i get 6 for ever.
>
>
>
> [2015-11-25 14:48:28,638] ERROR [KafkaApi-1] error when handling request
> Name: FetchRequest; Version: 1; CorrelationId: 643; ClientId:
> ReplicaFetcherThread-0-4; ReplicaId:
> 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [__consumer_offsets,49]
> -> PartitionFetchInfo(0,1048576),[__consumer_offsets,17] ->
> PartitionFetchInfo(0,1048576),[__con
> sumer_offsets,29] -> PartitionFetchInfo(0,1048576),[blcs,6] ->
> PartitionFetchInfo(0,1048576),[__consumer_offsets,41] ->
> PartitionFetchInfo(0,1048576),[__consumer_offsets,13
> ] -> PartitionFetchInfo(0,1048576),[__consumer_offsets,5] ->
> PartitionFetchInfo(0,1048576),[__consumer_offsets,37] ->
> PartitionFetchInfo(0,1048576),[__consumer_offsets,25]
> -> PartitionFetchInfo(0,1048576),[__consumer_offsets,1] ->
> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> kafka.common.KafkaException: Should not set log end offset on partition
> [__consumer_offsets,49]'s local replica 1
> at kafka.cluster.Replica.logEndOffset_$eq(Replica.scala:66)
> at kafka.cluster.Replica.updateLogReadResult(Replica.scala:53)
> at
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:240)
> at
>
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
> at
>
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
> at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
> at
>
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
> at
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> at java.lang.Thread.run(Thread.java:722)
>


Re: Consumer group disappears and consumers loops

2015-11-25 Thread Guozhang Wang
Hello Martin,

It seems your consumer's heartbeat.interval.ms config value is too small
(default is 3 seconds) for your environment, consider increasing it and see
if this issue goes away.

At the same time, we have some better error handling fixes in trunk which
will be included in the next point release.

https://issues.apache.org/jira/browse/KAFKA-2860

Guozhang



On Wed, Nov 25, 2015 at 6:54 AM, Martin Skøtt <
martin.sko...@falconsocial.com> wrote:

> Hi,
>
> I'm experiencing some very strange issues with 0.9. I get these log
> messages from the new consumer:
>
> [main] ERROR
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Error
> ILLEGAL_GENERATION occurred while committing offsets for group
> aaa-bbb-reader
> [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
> - Auto offset commit failed: Commit cannot be completed due to group
> rebalance
> [main] ERROR
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Error
> ILLEGAL_GENERATION occurred while committing offsets for group
> aaa-bbb-reader
> [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
> - Auto offset commit failed:
> [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
> - Attempt to join group aaa-bbb-reader failed due to unknown member id,
> resetting and retrying.
>
> And this in the broker log:
> [2015-11-25 15:41:01,542] INFO [GroupCoordinator 0]: Preparing to
> restabilize group aaa-bbb-reader with old generation 1
> (kafka.coordinator.GroupCoordinator)
> [2015-11-25 15:41:01,544] INFO [GroupCoordinator 0]:
> Group aaa-bbb-reader generation 1 is dead and removed
> (kafka.coordinator.GroupCoordinator)
> [2015-11-25 15:41:13,474] INFO [GroupCoordinator 0]: Preparing to
> restabilize group aaa-bbb-reader with old generation 0
> (kafka.coordinator.GroupCoordinator)
> [2015-11-25 15:41:13,475] INFO [GroupCoordinator 0]: Stabilized
> group aaa-bbb-reader generation 1 (kafka.coordinator.GroupCoordinator)
> [2015-11-25 15:41:13,477] INFO [GroupCoordinator 0]: Assignment received
> from leader for group aaa-bbb-reader for generation 1
> (kafka.coordinator.GroupCoordinator)
> [2015-11-25 15:41:43,478] INFO [GroupCoordinator 0]: Preparing to
> restabilize group aaa-bbb-reader with old generation 1
> (kafka.coordinator.GroupCoordinator)
> [2015-11-25 15:41:43,478] INFO [GroupCoordinator 0]:
> Group aaa-bbb-reader generation 1 is dead and removed
> (kafka.coordinator.GroupCoordinator)
>
> When this happens the kafka-consumer-groups describe command keeps saying
> that the group no longer exists or is rebalancing. What is probably even
> worse is that my consumers appears to be looping constantly through
> everything written to the topics!?
>
> Does anyone have any input on what might be happening?
>
> I'm running 0.9 locally on my laptop using one Zookeeper and one broker,
> both using the configuration provided in the distribution. I have 13 topics
> with two partitions each and a replication factor of 1. I run one producer
> and once consumer also on the same machine.
>
> --
> Martin Skøtt
>



-- 
-- Guozhang


Re: Kafka Connect and Spark/Storm Comparisons

2015-11-25 Thread Cody Koeninger
Spark's direct stream kafka integration should take advantage of data
locality if you're running Spark executors on the same nodes as Kafka
brokers.

On Wed, Nov 25, 2015 at 9:50 AM, Dave Ariens  wrote:

> I just finished reading up on Kafka Connect<
> http://kafka.apache.org/documentation.html#connect> and am trying to wrap
> my head around where it fits within the big data ecosystem.
>
> Other than the high level overview provided in the docs I haven't heard
> much about this feature. My limited understanding of it so far is that it
> includes semantics similar to Storm (sources/spouts, sinks/bolts) and
> allows for distributed processing of streams using tasks that handle data
> defined in records conforming to a schema.  Assuming that's mostly
> accurate, is anyone able to speak to why a developer would want to use
> Kafka Connect over Spark (or maybe even Storm but to a lesser degree)?  Is
> Kafka Connect trying to address any short comings?  I understand it greatly
> simplifies offset persistence but that's not terribly difficult to
> implement on top of Spark (see my offset persistence hack<
> https://gist.github.com/ariens/e6a39bc3dbeb11467e53>).  Where is Kafka
> Connect being targeted to within the  vast ecosystem that is big data?
>
> Does Kafka Connect offer efficiencies 'under the hood' taking advantage of
> data locality and the fact that it distributes workload on the actual Kafka
> cluster itself?
>
> I can see basic ETL and data warehouse bulk operations simplified where
> one just wants an easy way to get all data in/out of Kafka and reduce the
> network IO of having multiple compute clusters but for any data science
> type operations (machine learning, etc) I would expect working with Spark's
> RDDs to be more efficient.
>
>
>
>
>
>
>
>
>
>
>


Re: upgrade to 0.9 and auto generated broker id

2015-11-25 Thread Jun Rao
broker.id is the identifier of the broker. If you have existing data, you
should preserve the broker.id. If you restart the broker with a new id, all
replicas stored with the old broker id are considered gone.

Thanks,

Jun

On Wed, Nov 25, 2015 at 7:20 AM, Evgeniy Shishkin 
wrote:

> Hello,
>
> i have a question regarding upgrade to 0.9
>
> Is it recommended to keep broker.id in config after upgrade?
> Will be there any dataloss or stuck replication if we remove broker.id
> and restart broker?
>
> Thanks.


flush() vs close()

2015-11-25 Thread Kashif Usmani
Hello,


I notice that 0.9 Kafka release had a new flush() call added to the
producer API (See https://issues.apache.org/jira/browse/KAFKA-1865).

I am trying to understand what is the difference between producer.flush()
and producer.close()? They seem to be doing pretty similar work. If not,
can someone please explain?

KAFKA-1865 mentions that flush() would ensure “that any record enqueued
prior to flush() would have completed being sent (either successfully or
not).” and javadocs for close() say that “Close this producer. This
method blocks until al in-flight requests complete.” Does ‘in-flight
requests complete’ mean same as enqueued records are sent to Kafka?



Kashif


RE: kafka 0.8 producer issue

2015-11-25 Thread Kudumula, Surender
Hi prabhjot
I just realized its not producing to kafka that’s why iam getting this null 
pointer. In my localhost I can see kafka info/debug logs. But in HDP cluster I 
cannot see any exceptions. How should I configure the logger to show exceptions 
in kafka log files thanks



-Original Message-
From: Prabhjot Bharaj [mailto:prabhbha...@gmail.com] 
Sent: 25 November 2015 17:35
To: users@kafka.apache.org
Cc: d...@kafka.apache.org
Subject: Re: kafka 0.8 producer issue

Hi,

From the information that you've provided, I think your callback is the culprit 
here. It is seen from the stacktrace:-

at com.hpe.ssmamp.kafka.KafkaPDFAProducer$1.onCompletion(
KafkaPDFAProducer.java:62)

Please provide more information like a code snippet etc, so that we can tell 
more

Thanks,
Prabhjot

On Wed, Nov 25, 2015 at 9:24 PM, Kudumula, Surender < 
surender.kudum...@hpe.com> wrote:

> Hi all
> I am trying to get the producer working. It was working before but now 
> getting the following issue. I have created a new topic as well just 
> in case if it was the issue with topic but still no luck. I have 
> increased the message size in broker as iam trying to send atleast 3mb 
> message here in byte array format. Any suggestion please???
>
> 2015-11-25 15:46:11 INFO  Login:185 - TGT refresh sleeping until: Tue 
> Dec
> 01 07:03:07 GMT 2015
> 2015-11-25 15:46:11 INFO  KafkaProducer:558 - Closing the Kafka 
> producer with timeoutMillis = 9223372036854775807 ms.
> 2015-11-25 15:46:12 ERROR RecordBatch:96 - Error executing 
> user-provided callback on message for topic-partition ResponseTopic-0:
> java.lang.NullPointerException
> at
> com.hpe.ssmamp.kafka.KafkaPDFAProducer$1.onCompletion(KafkaPDFAProducer.java:62)
> at
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:93)
> at
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:285)
> at
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:253)
> at
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55)
> at
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:328)
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:209)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:140)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>


--
-
"There are only 10 types of people in the world: Those who understand binary, 
and those who don't"


Re: All brokers are running but some partitions' leader is -1

2015-11-25 Thread Qi Xu
Great to know that. Thanks Gwen!

On Wed, Nov 25, 2015 at 12:03 PM, Gwen Shapira  wrote:

> 1. Yes, you can do a rolling upgrade of brokers from 0.8.2 to 0.9.0. The
> important thing is to upgrade the brokers before you upgrade any of the
> clients.
>
> 2. I'm not aware of issues with 0.9.0 and SparkStreaming. However,
> definitely do your own testing to make sure.
>
> On Wed, Nov 25, 2015 at 11:25 AM, Qi Xu  wrote:
>
> > Hi Gwen,
> > Yes, we're going to upgrade the 0.9.0 version. Regarding the upgrade, we
> > definitely don't want to have down time of our cluster.
> > So the upgrade will be machine by machine. Will the release 0.9.0 work
> with
> > the Aug's version together in the same Kafka cluster?
> > Also we currently run spark streaming job (with scala 2.10) against the
> > cluster. Any known issues of 0.9.0 are you aware of under this scenario?
> >
> > Thanks,
> > Tony
> >
> >
> > On Mon, Nov 23, 2015 at 5:41 PM, Gwen Shapira  wrote:
> >
> > > We fixed many many bugs since August. Since we are about to release
> 0.9.0
> > > (with SSL!), maybe wait a day and go with a released and tested
> version.
> > >
> > > On Mon, Nov 23, 2015 at 3:01 PM, Qi Xu  wrote:
> > >
> > > > Forgot to mention is that the Kafka version we're using is from Aug's
> > > > Trunk branch---which has the SSL support.
> > > >
> > > > Thanks again,
> > > > Qi
> > > >
> > > >
> > > > On Mon, Nov 23, 2015 at 2:29 PM, Qi Xu  wrote:
> > > >
> > > >> Loop another guy from our team.
> > > >>
> > > >> On Mon, Nov 23, 2015 at 2:26 PM, Qi Xu  wrote:
> > > >>
> > > >>> Hi folks,
> > > >>> We have a 10 node cluster and have several topics. Each topic has
> > about
> > > >>> 256 partitions with 3 replica factor. Now we run into an issue that
> > in
> > > some
> > > >>> topic, a few partition (< 10)'s leader is -1 and all of them has
> only
> > > one
> > > >>> synced partition.
> > > >>>
> > > >>> From the Kafka manager, here's the snapshot:
> > > >>> [image: Inline image 2]
> > > >>>
> > > >>> [image: Inline image 1]
> > > >>>
> > > >>> here's the state log:
> > > >>> [2015-11-23 21:57:58,598] ERROR Controller 1 epoch 435499 initiated
> > > >>> state change for partition [userlogs,84] from OnlinePartition to
> > > >>> OnlinePartition failed (state.change.logger)
> > > >>> kafka.common.StateChangeFailedException: encountered error while
> > > >>> electing leader for partition [userlogs,84] due to: Preferred
> replica
> > > 0 for
> > > >>> partition [userlogs,84] is either not alive or not in the isr.
> > Current
> > > >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}].
> > > >>> Caused by: kafka.common.StateChangeFailedException: Preferred
> > replica 0
> > > >>> for partition [userlogs,84] is either not alive or not in the isr.
> > > Current
> > > >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}]
> > > >>>
> > > >>> My question is:
> > > >>> 1) how could this happen and how can I fix it or work around it?
> > > >>> 2) Is 256 partitions too big? We have about 200+ cores for spark
> > > >>> streaming job.
> > > >>>
> > > >>> Thanks,
> > > >>> Qi
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> >
>


Re: Kafka Broker Max Connection

2015-11-25 Thread 马哲超
I havn't found it in the document.

2015-11-25 19:40 GMT+08:00 Muqtafi Akhmad :

> but is there any config that limit connections 'globally'? is it only
> limited by host's maximum number of open file descriptor?
>
> Thank you,
>
> On Tue, Nov 24, 2015 at 6:53 PM, 马哲超  wrote:
>
> >  There is a configuration for broker, "max.connections.per.ip", and
> default
> > is Int.MaxValue.
> >
> > Here's the doc:
> https://kafka.apache.org/documentation.html#brokerconfigs
> >
> > 2015-11-24 18:55 GMT+08:00 Muqtafi Akhmad :
> >
> > > Hello guys, currently I am trying to figure out some things about Kafka
> > > broker connections, there are two things I wondering :
> > > 1. Is there any limit to number of Kafka broker connection?
> > > 2. is there any configuration to limit the number of Kafka broker
> > > connection?
> > >
> > > Any clue or information is appreciated,
> > >
> > > Thank you,
> > >
> > > --
> > > Muqtafi Akhmad
> > > Software Engineer
> > > Traveloka
> > >
> >
>
>
>
> --
> Muqtafi Akhmad
> Software Engineer
> Traveloka
>


Re: Flush Messages in KafkaProducer Buffer

2015-11-25 Thread Gwen Shapira
In 0.9.0, close() has a timeout parameter that allows specifying how long
to wait for the in-flight messages to complete (definition of complete
depends on value of "acks" parameter).

On Wed, Nov 25, 2015 at 3:58 AM, Muqtafi Akhmad 
wrote:

> Hello guys,
>
> I am using KafkaProducer (org.apache.kafka.clients.producer.KafkaProducer)
> to send messages to Kafka broker. I set BUFFER_MEMORY_CONFIG to some MBs.
> If there is a case where we need to shutdown kafka producer when there are
> messages in producer's buffer, is there any way to flush these messages to
> kafka to prevent event loss? will be calling the close() method do the
> trick?
>
> Thank you,
>
> --
> Muqtafi Akhmad
> Software Engineer
> Traveloka
>


Re: Kafka broker goes down when consumer is stopped.

2015-11-25 Thread Gwen Shapira
It looks like you have a single broker (with id = 0 ) and that topic1 has a
single replica and the broker is alive and well.

The socket error is our bug (shouldn't be an error) and doesn't indicate
that the broker is down.

On Wed, Nov 25, 2015 at 3:26 AM, Shaikh, Mazhar A (Mazhar) <
mazhar.sha...@in.verizon.com> wrote:

> Hi Team,
>
> In my test setup, Kafka broker goes down when consumer is stopped.
>
> Below are the  version & logs.
>
> Request you help to solve this issue.
>
>
> Kafka Version : kafka_2.10-0.8.2.1
>
> C++ application using   librdkafka_2.10-0.8.2.1 library for consumer &
> producer.
>
>
>
> server.log:
>
>
>
> [2015-11-25 05:10:12,798] INFO Closing socket connection to /
> 192.168.108.140. (kafka.network.Processor)
>
> [2015-11-25 05:10:12,798] INFO Closing socket connection to /
> 192.168.108.140. (kafka.network.Processor)
>
> [2015-11-25 05:10:12,798] ERROR Closing socket for /192.168.108.140
> because of error (kafka.network.Processor)
>
> java.io.IOException: Connection reset by peer
>
> at sun.nio.ch.FileDispatcher.read0(Native Method)
>
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
>
> at sun.nio.ch.IOUtil.read(IOUtil.java:224)
>
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
>
> at kafka.utils.Utils$.read(Utils.scala:380)
>
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> at kafka.network.Processor.read(SocketServer.scala:444)
>
> at kafka.network.Processor.run(SocketServer.scala:340)
>
> at java.lang.Thread.run(Thread.java:701)
>
> [2015-11-25 05:33:58,456] INFO Verifying properties
> (kafka.utils.VerifiableProperties)
>
>
> # bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1
> Topic:topic1PartitionCount:16   ReplicationFactor:1 Configs:
> Topic: topic1   Partition: 0Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 1Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 2Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 3Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 4Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 5Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 6Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 7Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 8Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 9Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 10   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 11   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 12   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 13   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 14   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 15   Leader: 0   Replicas: 0
>  Isr: 0
>
> Regards,
> Mazhar Shaikh.
>


Re: Flush Messages in KafkaProducer Buffer

2015-11-25 Thread Muqtafi Akhmad
Hello Gwen,

Currently I am using 0.8.2, is it recommended to upgrade to 0.9.0? does it
will backward compatible to 0.8.2 brokers?
Is there any way to 'recover' buffered messages in case there is exception
when calling the close() method?

Thank you,

On Thu, Nov 26, 2015 at 8:26 AM, Gwen Shapira  wrote:

> In 0.9.0, close() has a timeout parameter that allows specifying how long
> to wait for the in-flight messages to complete (definition of complete
> depends on value of "acks" parameter).
>
> On Wed, Nov 25, 2015 at 3:58 AM, Muqtafi Akhmad 
> wrote:
>
> > Hello guys,
> >
> > I am using KafkaProducer
> (org.apache.kafka.clients.producer.KafkaProducer)
> > to send messages to Kafka broker. I set BUFFER_MEMORY_CONFIG to some MBs.
> > If there is a case where we need to shutdown kafka producer when there
> are
> > messages in producer's buffer, is there any way to flush these messages
> to
> > kafka to prevent event loss? will be calling the close() method do the
> > trick?
> >
> > Thank you,
> >
> > --
> > Muqtafi Akhmad
> > Software Engineer
> > Traveloka
> >
>



-- 
Muqtafi Akhmad
Software Engineer
Traveloka


Re: 0.9.0.0[error]

2015-11-25 Thread Fredo Lee
this is my config file for original file with some changed by me.

broker.id=1
listeners=PLAINTEXT://:9092
num.partitions=10
log.dirs=/tmp/kafka-logs1
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=2000
delete.topic.enable=true
default.replication.factor=2
auto.leader.rebalance.enable=true


if i change listeners to 9093, it works There is no process running on
this port!!
i donot know why


2015-11-25 23:58 GMT+08:00 Jun Rao :

> Fredo,
>
> Thanks for reporting this. Are you starting a brand new 0.9.0.0 cluster?
> Are there steps that one can follow to reproduce this issue easily?
>
> Jun
>
> On Tue, Nov 24, 2015 at 10:52 PM, Fredo Lee 
> wrote:
>
> > The content below is the report for kafka
> >
> > when i try to fetch coordinator broker, i get 6 for ever.
> >
> >
> >
> > [2015-11-25 14:48:28,638] ERROR [KafkaApi-1] error when handling request
> > Name: FetchRequest; Version: 1; CorrelationId: 643; ClientId:
> > ReplicaFetcherThread-0-4; ReplicaId:
> > 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
> [__consumer_offsets,49]
> > -> PartitionFetchInfo(0,1048576),[__consumer_offsets,17] ->
> > PartitionFetchInfo(0,1048576),[__con
> > sumer_offsets,29] -> PartitionFetchInfo(0,1048576),[blcs,6] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,41] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,13
> > ] -> PartitionFetchInfo(0,1048576),[__consumer_offsets,5] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,37] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,25]
> > -> PartitionFetchInfo(0,1048576),[__consumer_offsets,1] ->
> > PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> > kafka.common.KafkaException: Should not set log end offset on partition
> > [__consumer_offsets,49]'s local replica 1
> > at kafka.cluster.Replica.logEndOffset_$eq(Replica.scala:66)
> > at kafka.cluster.Replica.updateLogReadResult(Replica.scala:53)
> > at
> > kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:240)
> > at
> >
> >
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
> > at
> >
> >
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
> > at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
> > at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
> > at
> >
> >
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
> > at
> > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
> > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> > at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> > at java.lang.Thread.run(Thread.java:722)
> >
>


Re: All brokers are running but some partitions' leader is -1

2015-11-25 Thread Qi Xu
Hi Gwen,
Yes, we're going to upgrade the 0.9.0 version. Regarding the upgrade, we
definitely don't want to have down time of our cluster.
So the upgrade will be machine by machine. Will the release 0.9.0 work with
the Aug's version together in the same Kafka cluster?
Also we currently run spark streaming job (with scala 2.10) against the
cluster. Any known issues of 0.9.0 are you aware of under this scenario?

Thanks,
Tony


On Mon, Nov 23, 2015 at 5:41 PM, Gwen Shapira  wrote:

> We fixed many many bugs since August. Since we are about to release 0.9.0
> (with SSL!), maybe wait a day and go with a released and tested version.
>
> On Mon, Nov 23, 2015 at 3:01 PM, Qi Xu  wrote:
>
> > Forgot to mention is that the Kafka version we're using is from Aug's
> > Trunk branch---which has the SSL support.
> >
> > Thanks again,
> > Qi
> >
> >
> > On Mon, Nov 23, 2015 at 2:29 PM, Qi Xu  wrote:
> >
> >> Loop another guy from our team.
> >>
> >> On Mon, Nov 23, 2015 at 2:26 PM, Qi Xu  wrote:
> >>
> >>> Hi folks,
> >>> We have a 10 node cluster and have several topics. Each topic has about
> >>> 256 partitions with 3 replica factor. Now we run into an issue that in
> some
> >>> topic, a few partition (< 10)'s leader is -1 and all of them has only
> one
> >>> synced partition.
> >>>
> >>> From the Kafka manager, here's the snapshot:
> >>> [image: Inline image 2]
> >>>
> >>> [image: Inline image 1]
> >>>
> >>> here's the state log:
> >>> [2015-11-23 21:57:58,598] ERROR Controller 1 epoch 435499 initiated
> >>> state change for partition [userlogs,84] from OnlinePartition to
> >>> OnlinePartition failed (state.change.logger)
> >>> kafka.common.StateChangeFailedException: encountered error while
> >>> electing leader for partition [userlogs,84] due to: Preferred replica
> 0 for
> >>> partition [userlogs,84] is either not alive or not in the isr. Current
> >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}].
> >>> Caused by: kafka.common.StateChangeFailedException: Preferred replica 0
> >>> for partition [userlogs,84] is either not alive or not in the isr.
> Current
> >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}]
> >>>
> >>> My question is:
> >>> 1) how could this happen and how can I fix it or work around it?
> >>> 2) Is 256 partitions too big? We have about 200+ cores for spark
> >>> streaming job.
> >>>
> >>> Thanks,
> >>> Qi
> >>>
> >>>
> >>
> >
>


Re: All brokers are running but some partitions' leader is -1

2015-11-25 Thread Gwen Shapira
1. Yes, you can do a rolling upgrade of brokers from 0.8.2 to 0.9.0. The
important thing is to upgrade the brokers before you upgrade any of the
clients.

2. I'm not aware of issues with 0.9.0 and SparkStreaming. However,
definitely do your own testing to make sure.

On Wed, Nov 25, 2015 at 11:25 AM, Qi Xu  wrote:

> Hi Gwen,
> Yes, we're going to upgrade the 0.9.0 version. Regarding the upgrade, we
> definitely don't want to have down time of our cluster.
> So the upgrade will be machine by machine. Will the release 0.9.0 work with
> the Aug's version together in the same Kafka cluster?
> Also we currently run spark streaming job (with scala 2.10) against the
> cluster. Any known issues of 0.9.0 are you aware of under this scenario?
>
> Thanks,
> Tony
>
>
> On Mon, Nov 23, 2015 at 5:41 PM, Gwen Shapira  wrote:
>
> > We fixed many many bugs since August. Since we are about to release 0.9.0
> > (with SSL!), maybe wait a day and go with a released and tested version.
> >
> > On Mon, Nov 23, 2015 at 3:01 PM, Qi Xu  wrote:
> >
> > > Forgot to mention is that the Kafka version we're using is from Aug's
> > > Trunk branch---which has the SSL support.
> > >
> > > Thanks again,
> > > Qi
> > >
> > >
> > > On Mon, Nov 23, 2015 at 2:29 PM, Qi Xu  wrote:
> > >
> > >> Loop another guy from our team.
> > >>
> > >> On Mon, Nov 23, 2015 at 2:26 PM, Qi Xu  wrote:
> > >>
> > >>> Hi folks,
> > >>> We have a 10 node cluster and have several topics. Each topic has
> about
> > >>> 256 partitions with 3 replica factor. Now we run into an issue that
> in
> > some
> > >>> topic, a few partition (< 10)'s leader is -1 and all of them has only
> > one
> > >>> synced partition.
> > >>>
> > >>> From the Kafka manager, here's the snapshot:
> > >>> [image: Inline image 2]
> > >>>
> > >>> [image: Inline image 1]
> > >>>
> > >>> here's the state log:
> > >>> [2015-11-23 21:57:58,598] ERROR Controller 1 epoch 435499 initiated
> > >>> state change for partition [userlogs,84] from OnlinePartition to
> > >>> OnlinePartition failed (state.change.logger)
> > >>> kafka.common.StateChangeFailedException: encountered error while
> > >>> electing leader for partition [userlogs,84] due to: Preferred replica
> > 0 for
> > >>> partition [userlogs,84] is either not alive or not in the isr.
> Current
> > >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}].
> > >>> Caused by: kafka.common.StateChangeFailedException: Preferred
> replica 0
> > >>> for partition [userlogs,84] is either not alive or not in the isr.
> > Current
> > >>> leader and ISR: [{"leader":-1,"leader_epoch":203,"isr":[1]}]
> > >>>
> > >>> My question is:
> > >>> 1) how could this happen and how can I fix it or work around it?
> > >>> 2) Is 256 partitions too big? We have about 200+ cores for spark
> > >>> streaming job.
> > >>>
> > >>> Thanks,
> > >>> Qi
> > >>>
> > >>>
> > >>
> > >
> >
>


Re: Kafka Connect and Spark/Storm Comparisons

2015-11-25 Thread Jay Kreps
Hey Dave,

We're separating the problem of getting data in and out of Kafka from the
problem of transforming it. If you think about ETL (Extract, Transform,
Load), what Kafka Connect does is E and L really really well and not T at
all; the focus in stream processing systems is T with E and L being a bit
of a necessary evil. If you are trying to get a single stream of data for
one application, directly using Storm or Spark with appropriate plugins is
totally reasonable. If you are trying to capture a bunch of different data
sources for multiple uses these systems get really awkward really fast.

Imagine a world in which you wanted to capture a significant portion of
what happened in your company as real-time streams and where there are many
things that use data. You could theoretically set up a Storm or Spark job
for each database table purely for the purpose of loading data but managing
this would be a bit of a nightmare. I think this is where Kafka Connect
really shines.

The other advantage of this is that transformation of data is inherently a
deep problem that is close to the programmer. There is lots of room here
for query languages, frameworks in different languages, etc. On the other
hand ingress and egress is much more well defined problem.

So the approach we're building towards is one where data is captured more
or less as it is, at large scale, and then is available for further
transformation or loading into many other systems. The transformation would
be the role of the stream processing systems and the loading and unloading
the role of Kafka Connect.

The advantage Kafka Connect has is the following:
- No additional cluster is needed, it directly co-ordinates with the Kafka
cluster
- It does a good job of capturing schema information from sources if it is
present
- It does a good job of handling scalable data capture--if you want to add
a new table to the set of things you're pulling data from that is just a
simple REST call not another job to manually configure and manage.

Hope that sheds some light on things.

-Jay

On Wed, Nov 25, 2015 at 7:50 AM, Dave Ariens  wrote:

> I just finished reading up on Kafka Connect<
> http://kafka.apache.org/documentation.html#connect> and am trying to wrap
> my head around where it fits within the big data ecosystem.
>
> Other than the high level overview provided in the docs I haven't heard
> much about this feature. My limited understanding of it so far is that it
> includes semantics similar to Storm (sources/spouts, sinks/bolts) and
> allows for distributed processing of streams using tasks that handle data
> defined in records conforming to a schema.  Assuming that's mostly
> accurate, is anyone able to speak to why a developer would want to use
> Kafka Connect over Spark (or maybe even Storm but to a lesser degree)?  Is
> Kafka Connect trying to address any short comings?  I understand it greatly
> simplifies offset persistence but that's not terribly difficult to
> implement on top of Spark (see my offset persistence hack<
> https://gist.github.com/ariens/e6a39bc3dbeb11467e53>).  Where is Kafka
> Connect being targeted to within the  vast ecosystem that is big data?
>
> Does Kafka Connect offer efficiencies 'under the hood' taking advantage of
> data locality and the fact that it distributes workload on the actual Kafka
> cluster itself?
>
> I can see basic ETL and data warehouse bulk operations simplified where
> one just wants an easy way to get all data in/out of Kafka and reduce the
> network IO of having multiple compute clusters but for any data science
> type operations (machine learning, etc) I would expect working with Spark's
> RDDs to be more efficient.
>
>
>
>
>
>
>
>
>
>
>


Kafka 0.8.2.1 - how to read from __consumer_offsets topic?

2015-11-25 Thread Marina
Hello,

I'm trying to find out which offsets my current High-Level consumers are 
working off. I use Kafka 0.8.2.1, with **no** "offset.storage" set in the 
server.properties of Kafka - which, I think, means that offsets are stored in 
Kafka. (I also verified that no offsets are stored in Zookeeper by checking 
this path in the Zk shell: 
**/consumers//offsets//** )

I tried to listen to the **__consumer_offsets** topic to see which consumer 
saves what value of offsets, but it did not work... 

I tried the following:

created a config file for console consumer as following:


=> more kafka_offset_consumer.config 

 exclude.internal.topics=false`

and tried to versions of the console consumer scripts:

#1:
bin/kafka-console-consumer.sh --consumer.config 
kafka_offset_consumer.config --topic __consumer_offsets --zookeeper 
localhost:2181

#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 
0 --broker-list localhost:9092 --formatter 
"kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config 
kafka_offset_consumer.config


Neither worked - it just sits there but does not print anything, even though 
the consumers are actively consuming/saving offsets.

Am I missing some other configuration/properties ?

thanks!

Marina

I have also posted this question on StackOverflow:

http://stackoverflow.com/questions/33925866/kafka-0-8-2-1-how-to-read-from-consumer-offsets-topic


Consumer group disappears and consumers loops

2015-11-25 Thread Martin Skøtt
Hi,

I'm experiencing some very strange issues with 0.9. I get these log
messages from the new consumer:

[main] ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Error
ILLEGAL_GENERATION occurred while committing offsets for group
aaa-bbb-reader
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
- Auto offset commit failed: Commit cannot be completed due to group
rebalance
[main] ERROR
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Error
ILLEGAL_GENERATION occurred while committing offsets for group
aaa-bbb-reader
[main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
- Auto offset commit failed:
[main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Attempt to join group aaa-bbb-reader failed due to unknown member id,
resetting and retrying.

And this in the broker log:
[2015-11-25 15:41:01,542] INFO [GroupCoordinator 0]: Preparing to
restabilize group aaa-bbb-reader with old generation 1
(kafka.coordinator.GroupCoordinator)
[2015-11-25 15:41:01,544] INFO [GroupCoordinator 0]:
Group aaa-bbb-reader generation 1 is dead and removed
(kafka.coordinator.GroupCoordinator)
[2015-11-25 15:41:13,474] INFO [GroupCoordinator 0]: Preparing to
restabilize group aaa-bbb-reader with old generation 0
(kafka.coordinator.GroupCoordinator)
[2015-11-25 15:41:13,475] INFO [GroupCoordinator 0]: Stabilized
group aaa-bbb-reader generation 1 (kafka.coordinator.GroupCoordinator)
[2015-11-25 15:41:13,477] INFO [GroupCoordinator 0]: Assignment received
from leader for group aaa-bbb-reader for generation 1
(kafka.coordinator.GroupCoordinator)
[2015-11-25 15:41:43,478] INFO [GroupCoordinator 0]: Preparing to
restabilize group aaa-bbb-reader with old generation 1
(kafka.coordinator.GroupCoordinator)
[2015-11-25 15:41:43,478] INFO [GroupCoordinator 0]:
Group aaa-bbb-reader generation 1 is dead and removed
(kafka.coordinator.GroupCoordinator)

When this happens the kafka-consumer-groups describe command keeps saying
that the group no longer exists or is rebalancing. What is probably even
worse is that my consumers appears to be looping constantly through
everything written to the topics!?

Does anyone have any input on what might be happening?

I'm running 0.9 locally on my laptop using one Zookeeper and one broker,
both using the configuration provided in the distribution. I have 13 topics
with two partitions each and a replication factor of 1. I run one producer
and once consumer also on the same machine.

-- 
Martin Skøtt


Re: Kafka broker goes down when consumer is stopped.

2015-11-25 Thread Gaurav Agarwal
Did u check with ps -ef Kafka whether Kafka broker is running or not
On Nov 25, 2015 4:56 PM, "Shaikh, Mazhar A (Mazhar)" <
mazhar.sha...@in.verizon.com> wrote:

> Hi Team,
>
> In my test setup, Kafka broker goes down when consumer is stopped.
>
> Below are the  version & logs.
>
> Request you help to solve this issue.
>
>
> Kafka Version : kafka_2.10-0.8.2.1
>
> C++ application using   librdkafka_2.10-0.8.2.1 library for consumer &
> producer.
>
>
>
> server.log:
>
>
>
> [2015-11-25 05:10:12,798] INFO Closing socket connection to /
> 192.168.108.140. (kafka.network.Processor)
>
> [2015-11-25 05:10:12,798] INFO Closing socket connection to /
> 192.168.108.140. (kafka.network.Processor)
>
> [2015-11-25 05:10:12,798] ERROR Closing socket for /192.168.108.140
> because of error (kafka.network.Processor)
>
> java.io.IOException: Connection reset by peer
>
> at sun.nio.ch.FileDispatcher.read0(Native Method)
>
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
>
> at sun.nio.ch.IOUtil.read(IOUtil.java:224)
>
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
>
> at kafka.utils.Utils$.read(Utils.scala:380)
>
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> at kafka.network.Processor.read(SocketServer.scala:444)
>
> at kafka.network.Processor.run(SocketServer.scala:340)
>
> at java.lang.Thread.run(Thread.java:701)
>
> [2015-11-25 05:33:58,456] INFO Verifying properties
> (kafka.utils.VerifiableProperties)
>
>
> # bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1
> Topic:topic1PartitionCount:16   ReplicationFactor:1 Configs:
> Topic: topic1   Partition: 0Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 1Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 2Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 3Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 4Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 5Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 6Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 7Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 8Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 9Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 10   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 11   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 12   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 13   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 14   Leader: 0   Replicas: 0
>  Isr: 0
> Topic: topic1   Partition: 15   Leader: 0   Replicas: 0
>  Isr: 0
>
> Regards,
> Mazhar Shaikh.
>


Re: kafka 0.8 producer issue

2015-11-25 Thread Gaurav Agarwal
Can u share the code that you are using to publish the message. Also can u
whether a small message is.published
On Nov 25, 2015 9:25 PM, "Kudumula, Surender" 
wrote:

> Hi all
> I am trying to get the producer working. It was working before but now
> getting the following issue. I have created a new topic as well just in
> case if it was the issue with topic but still no luck. I have increased the
> message size in broker as iam trying to send atleast 3mb message here in
> byte array format. Any suggestion please???
>
> 2015-11-25 15:46:11 INFO  Login:185 - TGT refresh sleeping until: Tue Dec
> 01 07:03:07 GMT 2015
> 2015-11-25 15:46:11 INFO  KafkaProducer:558 - Closing the Kafka producer
> with timeoutMillis = 9223372036854775807 ms.
> 2015-11-25 15:46:12 ERROR RecordBatch:96 - Error executing user-provided
> callback on message for topic-partition ResponseTopic-0:
> java.lang.NullPointerException
> at
> com.hpe.ssmamp.kafka.KafkaPDFAProducer$1.onCompletion(KafkaPDFAProducer.java:62)
> at
> org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:93)
> at
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:285)
> at
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:253)
> at
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:55)
> at
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:328)
> at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:209)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:140)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>


Re: Increasing replication factor reliable?

2015-11-25 Thread Gaurav Agarwal
So u have two nodes running where you want to increase the replication
factor 2 because of fault tolerance. That won't be a problem
On Nov 25, 2015 6:26 AM, "Dillian Murphey"  wrote:

> Is it safe to run this on an active production topic?  A topic was created
> without a replication factor of 2 and I want to increase it from 1 to 2 to
> have fault tolerance.
>
>
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
>


Re: kafka producer and client use same groupid

2015-11-25 Thread Gaurav Agarwal
Can u check couple of things
1. Message size that u are sending ttry sending small message
2.check which encoder defaultencoder or string encoder u are using to
consume the message , are u serializing the message while sending normal
stream.
3.u create.any partition of topic just create only topic no partitions
On Nov 24, 2015 11:38 PM, "Kudumula, Surender" 
wrote:

> Hi all
> Is there anyway we can ensure in 0.8 that kafka remote producer and remote
> consumer work on the same groupId as my java consumer cannot consume
> messages from remote producer. Thanks
>
>
>
>


Re: 0.9.0.0[error]

2015-11-25 Thread Fredo Lee
I think it has nothing to do with those clients.

actually i write a consumer client with the erlang programming language,
but i have not used it yet.

i just use the script: kafka-topic.sh to create a topic named blcs , then
this error is reported

2015-11-26 13:37 GMT+08:00 Jun Rao :

> Are you running any non-java client, especially a consumer?
>
> Thanks,
>
> Jun
>
> On Wed, Nov 25, 2015 at 6:38 PM, Fredo Lee 
> wrote:
>
> > this is my config file for original file with some changed by me.
> >
> > broker.id=1
> > listeners=PLAINTEXT://:9092
> > num.partitions=10
> > log.dirs=/tmp/kafka-logs1
> > zookeeper.connect=localhost:2181
> > zookeeper.connection.timeout.ms=2000
> > delete.topic.enable=true
> > default.replication.factor=2
> > auto.leader.rebalance.enable=true
> >
> >
> > if i change listeners to 9093, it works There is no process running
> on
> > this port!!
> > i donot know why
> >
> >
> > 2015-11-25 23:58 GMT+08:00 Jun Rao :
> >
> > > Fredo,
> > >
> > > Thanks for reporting this. Are you starting a brand new 0.9.0.0
> cluster?
> > > Are there steps that one can follow to reproduce this issue easily?
> > >
> > > Jun
> > >
> > > On Tue, Nov 24, 2015 at 10:52 PM, Fredo Lee 
> > > wrote:
> > >
> > > > The content below is the report for kafka
> > > >
> > > > when i try to fetch coordinator broker, i get 6 for ever.
> > > >
> > > >
> > > >
> > > > [2015-11-25 14:48:28,638] ERROR [KafkaApi-1] error when handling
> > request
> > > > Name: FetchRequest; Version: 1; CorrelationId: 643; ClientId:
> > > > ReplicaFetcherThread-0-4; ReplicaId:
> > > > 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
> > > [__consumer_offsets,49]
> > > > -> PartitionFetchInfo(0,1048576),[__consumer_offsets,17] ->
> > > > PartitionFetchInfo(0,1048576),[__con
> > > > sumer_offsets,29] -> PartitionFetchInfo(0,1048576),[blcs,6] ->
> > > > PartitionFetchInfo(0,1048576),[__consumer_offsets,41] ->
> > > > PartitionFetchInfo(0,1048576),[__consumer_offsets,13
> > > > ] -> PartitionFetchInfo(0,1048576),[__consumer_offsets,5] ->
> > > > PartitionFetchInfo(0,1048576),[__consumer_offsets,37] ->
> > > > PartitionFetchInfo(0,1048576),[__consumer_offsets,25]
> > > > -> PartitionFetchInfo(0,1048576),[__consumer_offsets,1] ->
> > > > PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> > > > kafka.common.KafkaException: Should not set log end offset on
> partition
> > > > [__consumer_offsets,49]'s local replica 1
> > > > at kafka.cluster.Replica.logEndOffset_$eq(Replica.scala:66)
> > > > at
> kafka.cluster.Replica.updateLogReadResult(Replica.scala:53)
> > > > at
> > > >
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:240)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
> > > > at
> > > >
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
> > > > at
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
> > > > at
> > > > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
> > > > at
> > kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
> > > > at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> > > > at
> > > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> > > > at java.lang.Thread.run(Thread.java:722)
> > > >
> > >
> >
>


Notable failure scenarios in high-level/new consumer

2015-11-25 Thread Prabhjot Bharaj
Hello Folks,

I am trying to build fault tolerance on the consumer side, so as to make
sure that all failure scenarios are handled.
On Data integrity side, there are primary 2 requirements:-

1. No Data loss
2. No data duplication

I'm particularly interested in data duplication. e.g. there are various
steps in the following order that will happen on the consumer during each
consume cycle:-

1. connect
2. consume
3. write offset back to zookeeper/kafka (0.8/0.9)
4. process the message (which will be done by another code, not the
consumer api)

Please correct the above steps if I'm wrong

Now, failures (machine down/process down/unhandled exceptions or bugs) can
occur at each of the above steps
Especially, if a failure occurs after consuming the message and before
writing the offset back to zookeeper/kafka, on restart of the consumer, the
same message could be reconsumed - leading to duplication of this message,
if the 4th step is asynchronous.
e.g. if processing the message happens before writing back the offset, it
could cause data duplication after consumer restarts !

Is this a valid scenario ?
Also, are there any other scenarios that need to be taken into
consideration when consuming ?


Thanks,
Prabhjot


Re: Kafka Broker Max Connection

2015-11-25 Thread Muqtafi Akhmad
but is there any config that limit connections 'globally'? is it only
limited by host's maximum number of open file descriptor?

Thank you,

On Tue, Nov 24, 2015 at 6:53 PM, 马哲超  wrote:

>  There is a configuration for broker, "max.connections.per.ip", and default
> is Int.MaxValue.
>
> Here's the doc: https://kafka.apache.org/documentation.html#brokerconfigs
>
> 2015-11-24 18:55 GMT+08:00 Muqtafi Akhmad :
>
> > Hello guys, currently I am trying to figure out some things about Kafka
> > broker connections, there are two things I wondering :
> > 1. Is there any limit to number of Kafka broker connection?
> > 2. is there any configuration to limit the number of Kafka broker
> > connection?
> >
> > Any clue or information is appreciated,
> >
> > Thank you,
> >
> > --
> > Muqtafi Akhmad
> > Software Engineer
> > Traveloka
> >
>



-- 
Muqtafi Akhmad
Software Engineer
Traveloka


Flush Messages in KafkaProducer Buffer

2015-11-25 Thread Muqtafi Akhmad
Hello guys,

I am using KafkaProducer (org.apache.kafka.clients.producer.KafkaProducer)
to send messages to Kafka broker. I set BUFFER_MEMORY_CONFIG to some MBs.
If there is a case where we need to shutdown kafka producer when there are
messages in producer's buffer, is there any way to flush these messages to
kafka to prevent event loss? will be calling the close() method do the
trick?

Thank you,

-- 
Muqtafi Akhmad
Software Engineer
Traveloka


Question for consumer

2015-11-25 Thread Hahn Jiang
I use kafka 0.8.1.1 with Java.

There are many partitions in my service. Each partition has a producer or a
consumer through a state control it.
When state is master, i will stop consumer and shutdown the
ConsumerConnecter.
When state is slave, i will create a new ConsumerConnecter.
So i must have many connections to zookeeper. Now I want to create multiple
ConsumerConnecter with only one zk connect.


Does kafka have some way to implement this.

Thanks


Does Simple Consumer Thread Safe?

2015-11-25 Thread Muqtafi Akhmad
Hello guys,

is it safe to build a simple consumer pool that accessed by many threads?
does SimpleConsumer thread safe?

Thank you,

-- 
Muqtafi Akhmad
Software Engineer
Traveloka


New kafka-consumer-groups.sh not showing inactive consumer gorup

2015-11-25 Thread tao xiao
Hi team,

In 0.9.0.0 a new tool kafka-consumer-groups.sh is introduced to show offset
and lag for a particular consumer group prefer over the old
one kafka-consumer-offset-checker.sh. However I found that the new tool
only works for consumer groups that are currently active but not for
consumer groups that ever connected to the broker.  I wonder if this is
intended behavior. If yes how do query non-active consumer group committed
offset besides using kafka-consumer-offset-checker.sh


Re: 0.9.0.0[error]

2015-11-25 Thread Muqtafi Akhmad
hello Fredo,

Can you provide your program's code? There might be some clues

On Wed, Nov 25, 2015 at 2:27 PM, Fredo Lee  wrote:

> four kafka nodes, i get these errors
> one node, it works well.
>
> 2015-11-25 14:52 GMT+08:00 Fredo Lee :
>
> >
> > The content below is the report for kafka
> >
> > when i try to fetch coordinator broker, i get 6 for ever.
> >
> >
> >
> > [2015-11-25 14:48:28,638] ERROR [KafkaApi-1] error when handling request
> > Name: FetchRequest; Version: 1; CorrelationId: 643; ClientId:
> > ReplicaFetcherThread-0-4; ReplicaId:
> > 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
> > [__consumer_offsets,49] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,17] ->
> > PartitionFetchInfo(0,1048576),[__con
> > sumer_offsets,29] -> PartitionFetchInfo(0,1048576),[blcs,6] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,41] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,13
> > ] -> PartitionFetchInfo(0,1048576),[__consumer_offsets,5] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,37] ->
> > PartitionFetchInfo(0,1048576),[__consumer_offsets,25]
> > -> PartitionFetchInfo(0,1048576),[__consumer_offsets,1] ->
> > PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> > kafka.common.KafkaException: Should not set log end offset on partition
> > [__consumer_offsets,49]'s local replica 1
> > at kafka.cluster.Replica.logEndOffset_$eq(Replica.scala:66)
> > at kafka.cluster.Replica.updateLogReadResult(Replica.scala:53)
> > at
> > kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:240)
> > at
> >
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
> > at
> >
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
> > at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
> > at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
> > at
> >
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
> > at
> > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
> > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
> > at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> > at
> > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> > at java.lang.Thread.run(Thread.java:722)
> >
>



-- 
Muqtafi Akhmad
Software Engineer
Traveloka


Re: kafka producer and client use same groupid

2015-11-25 Thread Muqtafi Akhmad
Hello Surender,

do you use the high level consumer, low level consumer, or using connector
provided by Spark / Hadoop?

On Wed, Nov 25, 2015 at 1:29 AM, Kudumula, Surender <
surender.kudum...@hpe.com> wrote:

> Hi Prabhjot
> Thanks for your quick response. I never get any response to my queries:( .
> Anyway my issue here is I have my kafka running as part of hdp cluster. I
> have my producer in the same cluster producing to a topic in kafka and
> consumer which is on another node trying to consume the messages from the
> same topic. The issue I have here is when I run the command line consumer
> it consumes all the messages whereas my java client consumer doesn’t
> consume a single message. Any ideas please?
>
>
> -Original Message-
> From: Prabhjot Bharaj [mailto:prabhbha...@gmail.com]
> Sent: 24 November 2015 18:24
> To: d...@kafka.apache.org
> Cc: users@kafka.apache.org
> Subject: Re: kafka producer and client use same groupid
>
> Hi Surender,
>
> Please elaborate on your design
> Consumers don't talk to producers directly, Kafka is a brokered system,
> and Kafka sits between producers and consumers Also, consumers consume from
> partitions of a topic and producers write to partitions in a topic These
> partitions and the logical abstraction -topic-reside on Kafka and zookeeper
> respectively
>
> Thanks,
> Prabhjot
> On Nov 24, 2015 11:38 PM, "Kudumula, Surender" 
> wrote:
>
> > Hi all
> > Is there anyway we can ensure in 0.8 that kafka remote producer and
> > remote consumer work on the same groupId as my java consumer cannot
> > consume messages from remote producer. Thanks
> >
> >
> >
> >
>



-- 
Muqtafi Akhmad
Software Engineer
Traveloka


Re: message order problem

2015-11-25 Thread Muqtafi Akhmad
This behavior might be expected, quoting description in apache kafka
documentation here 

Kafka only provides a total order over messages within a partition, not
> between different partitions in a topic. Per-partition ordering combined
> with the ability to partition data by key is sufficient for most
> applications. However, if you require a total order over messages this can
> be achieved with a topic that has only one partition, though this will mean
> only one consumer process per consumer group.


On Wed, Nov 25, 2015 at 8:54 AM, Guozhang Wang  wrote:

> With ack=1, if your code is does sth. like:
>
> producer1.send(msg1).get() // get() blocks until a response is received.
> producer2.send(msg2).get() // get() blocks until a response is received.
>
> Then the ordering is guaranteed though under a broker failure the acked
> messages may be lost if they have not been replicated.
>
> Guozhang
>
>
>
> On Mon, Nov 23, 2015 at 11:08 PM, Yonghui Zhao 
> wrote:
>
> > Thanks Guozhang, Most params are not set  in our config . So max retries
> > should be 3 by default
> >
> > In your explanation:
> >
> > a. msg1 sent, produce() returned.
> > b. msg2 sent.
> > c. msg1 failed, and retried.
> > d. msg2 acked.
> > e. msg1 acked.
> >
> > But is acks is 0,   "the  retries configuration will not take effect (as
> > the client won't generally know of any failures). "
> >
> >
> > And I find in new producer to be released acks default value is already
> > changed to 1.
> >
> >
> > If acks is set to 1, the order will be maintained even they are sent from
> > different producers?
> >
> >
> >
> > 2015-11-24 2:47 GMT+08:00 Guozhang Wang :
> >
> > > Yonghui,
> > >
> > > With ack = 0 and retries > 0 this could happen since producer.send()
> > > returns before it gets acked from the broker, so you could get the
> > > following trace:
> > >
> > > a. msg1 sent, produce() returned.
> > > b. msg2 sent.
> > > c. msg1 failed, and retried.
> > > d. msg2 acked.
> > > e. msg1 acked.
> > >
> > > Assuming you said "msg1 and msg2 may notbe sent by one producer", out
> of
> > > ordering can even happen more likely as they could arrive to the broker
> > > from different sockets at arbitrary ordering.
> > >
> > > Guozhang
> > >
> > > On Mon, Nov 23, 2015 at 4:31 AM, Yonghui Zhao 
> > > wrote:
> > >
> > > > The ack mode is default value 0.
> > > >
> > > >
> > > >- 0, which means that the producer never waits for an
> > acknowledgement
> > > >from the broker (the same behavior as 0.7). This option provides
> the
> > > > lowest
> > > >latency but the weakest durability guarantees (some data will be
> > lost
> > > > when
> > > >a server fails).
> > > >
> > > > And msg1 and msg2 may not be sent by one broker.
> > > >
> > > >
> > > > 2015-11-21 7:56 GMT+08:00 Guozhang Wang :
> > > >
> > > > > Yonghui,
> > > > >
> > > > > What is the ack mode for the producer clients? And are msg1 and
> msg2
> > > sent
> > > > > by the same producer?
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Thu, Nov 19, 2015 at 10:59 PM, Yonghui Zhao <
> > zhaoyong...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Broker setting is:  8 partitions, 1 replica,  kafka version 0.8.1
> > > > > >
> > > > > > We send 2 message at almost same time.
> > > > > >
> > > > > > Msg1 first, Msg2 second.
> > > > > >
> > > > > > We have more than 1 producers in sync mode.
> > > > > >
> > > > > > We may send msg1 in one broker, after *producer.send return
> > response*
> > > > > > And send msg2 in the other broker.
> > > > > >
> > > > > > Both 2 msg has same partition key.
> > > > > >
> > > > > > In consumer side, we find the 2 msg is in same partition as
> > expected,
> > > > > but
> > > > > > the order is inverted as msg2, msg1.
> > > > > >
> > > > > > Is it possible for kafka?
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>



-- 
Muqtafi Akhmad
Software Engineer
Traveloka


Re: 0.9.0.0[error]

2015-11-25 Thread Jun Rao
Are you running any non-java client, especially a consumer?

Thanks,

Jun

On Wed, Nov 25, 2015 at 6:38 PM, Fredo Lee  wrote:

> this is my config file for original file with some changed by me.
>
> broker.id=1
> listeners=PLAINTEXT://:9092
> num.partitions=10
> log.dirs=/tmp/kafka-logs1
> zookeeper.connect=localhost:2181
> zookeeper.connection.timeout.ms=2000
> delete.topic.enable=true
> default.replication.factor=2
> auto.leader.rebalance.enable=true
>
>
> if i change listeners to 9093, it works There is no process running on
> this port!!
> i donot know why
>
>
> 2015-11-25 23:58 GMT+08:00 Jun Rao :
>
> > Fredo,
> >
> > Thanks for reporting this. Are you starting a brand new 0.9.0.0 cluster?
> > Are there steps that one can follow to reproduce this issue easily?
> >
> > Jun
> >
> > On Tue, Nov 24, 2015 at 10:52 PM, Fredo Lee 
> > wrote:
> >
> > > The content below is the report for kafka
> > >
> > > when i try to fetch coordinator broker, i get 6 for ever.
> > >
> > >
> > >
> > > [2015-11-25 14:48:28,638] ERROR [KafkaApi-1] error when handling
> request
> > > Name: FetchRequest; Version: 1; CorrelationId: 643; ClientId:
> > > ReplicaFetcherThread-0-4; ReplicaId:
> > > 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo:
> > [__consumer_offsets,49]
> > > -> PartitionFetchInfo(0,1048576),[__consumer_offsets,17] ->
> > > PartitionFetchInfo(0,1048576),[__con
> > > sumer_offsets,29] -> PartitionFetchInfo(0,1048576),[blcs,6] ->
> > > PartitionFetchInfo(0,1048576),[__consumer_offsets,41] ->
> > > PartitionFetchInfo(0,1048576),[__consumer_offsets,13
> > > ] -> PartitionFetchInfo(0,1048576),[__consumer_offsets,5] ->
> > > PartitionFetchInfo(0,1048576),[__consumer_offsets,37] ->
> > > PartitionFetchInfo(0,1048576),[__consumer_offsets,25]
> > > -> PartitionFetchInfo(0,1048576),[__consumer_offsets,1] ->
> > > PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> > > kafka.common.KafkaException: Should not set log end offset on partition
> > > [__consumer_offsets,49]'s local replica 1
> > > at kafka.cluster.Replica.logEndOffset_$eq(Replica.scala:66)
> > > at kafka.cluster.Replica.updateLogReadResult(Replica.scala:53)
> > > at
> > > kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:240)
> > > at
> > >
> > >
> >
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
> > > at
> > >
> > >
> >
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
> > > at
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
> > > at
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
> > > at
> > >
> > >
> >
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
> > > at
> > > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
> > > at
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
> > > at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> > > at
> > > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
> > > at java.lang.Thread.run(Thread.java:722)
> > >
> >
>