Re: Topics being automatically deleted?

2016-09-15 Thread Manikumar Reddy
looks like you have not changed the default data log directory. By default
kafka is configured to store the data logs to /tmp/ folder. /tmp gets
cleared
on system reboots. change log.dirs config property to some other directory.

On Thu, Sep 15, 2016 at 11:46 AM, Ali Akhtar  wrote:

> I've noticed that, on my own machine, if I start a kafka broker, then
> create a topic, then I stop that server and restart it, the topic I created
> is still kept.
>
> However, on restarts, it looks like the topic is deleted.
>
> Its also possible that the default retention policy of 24 hours causes the
> messages to be deleted.
>
> However, even if the messages are deleted, shouldn't the topic itself be
> preserved?
>
> What would happen if there are no new messages posted to a topic for a
> week, but I still have consumers listening to that topic? If the topic is
> deleted, will I need to restart my consumer? This seems like a pain.
>


Re: [ANNOUNCE] New committer: Jason Gustafson

2016-09-06 Thread Manikumar Reddy
congrats,  Jason!

On Wed, Sep 7, 2016 at 9:28 AM, Ashish Singh  wrote:

> Congrats, Jason!
>
> On Tuesday, September 6, 2016, Jason Gustafson  wrote:
>
> > Thanks all!
> >
> > On Tue, Sep 6, 2016 at 5:13 PM, Becket Qin  > > wrote:
> >
> > > Congrats, Jason!
> > >
> > > On Tue, Sep 6, 2016 at 5:09 PM, Onur Karaman
> >  > > >
> > > wrote:
> > >
> > > > congrats jason!
> > > >
> > > > On Tue, Sep 6, 2016 at 4:12 PM, Sriram Subramanian  > >
> > > > wrote:
> > > >
> > > > > Congratulations Jason!
> > > > >
> > > > > On Tue, Sep 6, 2016 at 3:40 PM, Vahid S Hashemian <
> > > > > vahidhashem...@us.ibm.com 
> > > > > > wrote:
> > > > >
> > > > > > Congratulations Jason on this very well deserved recognition.
> > > > > >
> > > > > > --Vahid
> > > > > >
> > > > > >
> > > > > >
> > > > > > From:   Neha Narkhede >
> > > > > > To: "d...@kafka.apache.org " <
> > d...@kafka.apache.org >,
> > > > > > "users@kafka.apache.org "  > >
> > > > > > Cc: "priv...@kafka.apache.org " <
> > priv...@kafka.apache.org >
> > > > > > Date:   09/06/2016 03:26 PM
> > > > > > Subject:[ANNOUNCE] New committer: Jason Gustafson
> > > > > >
> > > > > >
> > > > > >
> > > > > > The PMC for Apache Kafka has invited Jason Gustafson to join as a
> > > > > > committer and
> > > > > > we are pleased to announce that he has accepted!
> > > > > >
> > > > > > Jason has contributed numerous patches to a wide range of areas,
> > > > notably
> > > > > > within the new consumer and the Kafka Connect layers. He has
> > > displayed
> > > > > > great taste and judgement which has been apparent through his
> > > > involvement
> > > > > > across the board from mailing lists, JIRA, code reviews to
> > > contributing
> > > > > > features, bug fixes and code and documentation improvements.
> > > > > >
> > > > > > Thank you for your contribution and welcome to Apache Kafka,
> Jason!
> > > > > > --
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> Ashish h
>


Re: Understand producer metrics

2016-08-18 Thread Manikumar Reddy
This doc link may help:

http://kafka.apache.org/documentation.html#new_producer_monitoring

On Fri, Aug 19, 2016 at 2:36 AM, David Yu  wrote:

> Kafka users,
>
> I want to resurface this post since it becomes crucial for our team to
> understand our recent Samza throughput issues we are facing.
>
> Any help is appreciated.
>
> Thanks,
> David
>
> On Tue, Aug 2, 2016 at 10:30 PM David Yu  wrote:
>
> > I'm having a hard time finding documentation explaining the set of
> > producer metrics exposed by Kafka. Can anyone explain the following?
> >
> >
> >- batch-size-avg - Is this the number of msgs or number of bytes? Does
> >this only make sense for async producers?
> >- incoming-byte-rate/outgoing-byte-rate - Is this the number of bytes
> >in and out of the producer? If so, should they be about the same?
> >- record-queue-time-avg - Is this the avg ms a record is buffered?
> >- record-send-rate - In bytes or # of msgs?
> >- record-size-avg - In bytes?
> >- record-per-request-avg - What is a request? Is that the same as a
> >"batch"?
> >- request-latency-avg - In ms? How is this measured?
> >- request-size-avg - In bytes? This doesn't seem to match
> >batch-size-avg, which makes me think request and batch are different.
> >
> >
> > Thanks,
> > David
> >
>


Re: Unable to write, leader not available

2016-08-03 Thread Manikumar Reddy
Hi,

Can you enable Authorization debug logs and check for logs related to
denied operations..
we should also enable operations on Cluster resource.


Thanks,
Manikumar

On Thu, Aug 4, 2016 at 1:51 AM, Bryan Baugher  wrote:

> Hi everyone,
>
> I was trying out kerberos on Kafka 0.10.0.0 by creating a single node
> cluster. I managed to get everything setup and past all the authentication
> errors but whenever I try to use the console producer I get 'Error while
> fetching metadata ... LEADER_NOT_AVAILABLE'. In this case I've created the
> topic ahead of time (1 replica, 1 partition) and I can see that broker 0 is
> in the ISR and is the leader. I have also opened an ACL to the topic for my
> user to produce and was previously seeing authentication errors prior. I
> don't see any errors or helpful logs on the broker side even after turning
> on debug logging. Turning on debug logging on the client the only thing
> that stands out is that it lists the broker as 'node -1' instead of 0. It
> does mention the correct hostname/port and that it was able to successfully
> connect. Any ideas?
>
> Bryan
>


Re: [kafka-clients] [VOTE] 0.10.0.1 RC1

2016-08-03 Thread Manikumar Reddy
Hi,

There are two versions of slf4j-log4j jar in the build. (1.6.1, 1.7.21).
slf4j-log4j12-1.6.1.jar is coming from streams:examples module.

Thanks,
Manikumar

On Tue, Aug 2, 2016 at 8:31 PM, Ismael Juma  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second candidate for the release of Apache Kafka 0.10.0.1.
> This is a bug fix release and it includes fixes and improvements from 52
> JIRAs (including a few critical bugs). See the release notes for more
> details:
>
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc1/RELEASE_NOTES.html
>
> When compared to RC0, RC1 contains fixes for two bugs (KAFKA-4008
> and KAFKA-3950) and a couple of test stabilisation fixes.
>
> *** Please download, test and vote by Friday, 5 August, 8am PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging
>
> * Javadoc:
> http://home.apache.org/~ijuma/kafka-0.10.0.1-rc1/javadoc/
>
> * Tag to be voted upon (off 0.10.0 branch) is the 0.10.0.1-rc1 tag:
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=108580e4594d694827c953264969fe1ce2a7
>
> * Documentation:
> http://kafka.apache.org/0100/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0100/protocol.html
>
> * Successful Jenkins builds for the 0.10.0 branch:
> Unit/integration tests: *https://builds.apache.org/job/kafka-0.10.0-jdk7/179/
> *
> System tests: *https://jenkins.confluent.io/job/system-test-kafka-0.10.0/136/
> *
>
> Thanks,
> Ismael
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at https://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAD5tkZaRxAjQbwS_1q4MqskSYKxQWBFmdPVf_PP020bjY9%3DCgQ%40mail.gmail.com
> 
> .
> For more options, visit https://groups.google.com/d/optout.
>


Re: Topic not getting deleted on 0.8.2.1

2016-07-28 Thread Manikumar Reddy
many delete topic functionality related issues got fixed in latest
versions. It highly recommend to move to latest version.
https://issues.apache.org/jira/browse/KAFKA-1757 fixes similar issue on
windows platform.

On Thu, Jul 28, 2016 at 3:40 PM, Ghosh, Prabal Kumar <
prabal.kumar.gh...@sap.com> wrote:

> Hi Kafka Users,
>
> We are using kafka 0.8.2.1.We are not able to delete to any topic .
> We are using AdminUtils to create and delete topic.
> The topics get created successfully  with correct Leader and isr for each
> topic partitions.
> But when we try to delete topic using AdminUtils.deleteTopic(), it
> fails.The topic is indefinitely
> Marked for deletion.When I see the topic details, all partitions have
> Leader:-1 and isr:{}.
> For cleanup, I had to manually delete topic partitions from zk and kafka
> and restart both the process.
> But since, we are going to using kafka in production, the above clean up
> policy wont work.
>
> I have set delete.topic.enable=true.
>
> Any Suggestions.
>
> Controller Logs:
>
> 2016-07-28 20:52:16,670] DEBUG [Replica state machine on controller 0]:
> Are all replicas for topic jick deleted
> Map([Topic=jick,Partition=1,Replica=0] -> ReplicaDeletionIneligible,
> [Topic=jick,Partition=2,Replica=0] -> ReplicaDeletionStarted,
> [Topic=jick,Partition=0,Replica=0] -> ReplicaDeletionIneligible)
> (kafka.controller.ReplicaStateMachine)
> [2016-07-28 20:52:16,670] INFO [delete-topics-thread-0], Deletion for
> replicas 0 for partition [jick,2] of topic jick in progress
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-07-28 20:52:16,670] INFO [delete-topics-thread-0], Not retrying
> deletion of topic jick at this time since it is marked ineligible for
> deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-07-28 20:52:16,670] DEBUG [Topic Deletion Manager 0], Waiting for
> signal to start or continue topic deletion
> (kafka.controller.TopicDeletionManager)
> [2016-07-28 20:52:16,670] DEBUG [Topic Deletion Manager 0], Delete topic
> callback invoked for StopReplicaResponse(36,Map([jick,2] -> -1),0)
> (kafka.controller.TopicDeletionManager)
> [2016-07-28 20:52:16,670] DEBUG [Topic Deletion Manager 0], Deletion
> failed for replicas [Topic=jick,Partition=2,Replica=0]. Halting deletion
> for topics Set(jick) (kafka.controller.TopicDeletionManager)
> [2016-07-28 20:52:16,670] INFO [Replica state machine on controller 0]:
> Invoking state change to ReplicaDeletionIneligible for replicas
> [Topic=jick,Partition=2,Replica=0] (kafka.controller.ReplicaStateMachine)
> [2016-07-28 20:52:16,670] INFO [Topic Deletion Manager 0], Halted deletion
> of topics jick (kafka.controller.TopicDeletionManager)
> [2016-07-28 20:52:16,670] INFO [delete-topics-thread-0], Handling deletion
> for topics HCP-BIGDATA,tick,hick,jick
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-07-28 20:52:16,670] DEBUG [Replica state machine on controller 0]:
> Are all replicas for topic HCP-BIGDATA deleted
> Map([Topic=HCP-BIGDATA,Partition=0,Replica=0] -> OfflineReplica,
> [Topic=HCP-BIGDATA,Partition=2,Replica=0] -> OfflineReplica,
> [Topic=HCP-BIGDATA,Partition=1,Replica=0] -> OfflineReplica)
> (kafka.controller.ReplicaStateMachine)
> [2016-07-28 20:52:16,670] INFO [delete-topics-thread-0], Not retrying
> deletion of topic HCP-BIGDATA at this time since it is marked ineligible
> for deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-07-28 20:52:16,670] DEBUG [Replica state machine on controller 0]:
> Are all replicas for topic tick deleted
> Map([Topic=tick,Partition=2,Replica=0] -> OfflineReplica,
> [Topic=tick,Partition=1,Replica=0] -> OfflineReplica,
> [Topic=tick,Partition=0,Replica=0] -> OfflineReplica)
> (kafka.controller.ReplicaStateMachine)
> [2016-07-28 20:52:16,670] INFO [delete-topics-thread-0], Not retrying
> deletion of topic tick at this time since it is marked ineligible for
> deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-07-28 20:52:16,670] DEBUG [Replica state machine on controller 0]:
> Are all replicas for topic hick deleted
> Map([Topic=hick,Partition=1,Replica=0] -> OfflineReplica,
> [Topic=hick,Partition=0,Replica=0] -> OfflineReplica,
> [Topic=hick,Partition=2,Replica=0] -> OfflineReplica)
> (kafka.controller.ReplicaStateMachine)
> [2016-07-28 20:52:16,670] INFO [delete-topics-thread-0], Not retrying
> deletion of topic hick at this time since it is marked ineligible for
> deletion (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-07-28 20:52:16,686] DEBUG [Replica state machine on controller 0]:
> Are all replicas for topic jick deleted
> Map([Topic=jick,Partition=1,Replica=0] -> ReplicaDeletionIneligible,
> [Topic=jick,Partition=2,Replica=0] -> ReplicaDeletionIneligible,
> [Topic=jick,Partition=0,Replica=0] -> ReplicaDeletionIneligible)
> (kafka.controller.ReplicaStateMachine)
> [2016-07-28 20:52:16,686] INFO [Topic Deletion Manager 0], Retrying delete
> topic 

Re: Synchronized block in StreamTask

2016-07-28 Thread Manikumar Reddy
You already got reply from Guozhang on dev mailing list.

On Thu, Jul 28, 2016 at 7:09 AM, Pierre Coquentin <
pierre.coquen...@gmail.com> wrote:

> Hi,
>
> I've a simple technical question about kafka streams.
> In class org.apache.kafka.streams.processor.internals.StreamTask, the
> method "process" uses a synchronized block but I don't see why, the method
> doesn't seem to be called in a multi-threaded environnement as it's created
> and only accessed by a specific thread
> org.apache.kafka.streams.processor.internals.StreamThread.
> Am I missing something ? Or, as the API is unstable, this class is meant in
> the future to be shared between several threads ?
>
> Regards,
>
> Pierre
>


Re: Log retention not working

2016-07-27 Thread Manikumar Reddy
also check if any value set for log.retention.bytes broker config

On Wed, Jul 27, 2016 at 8:03 PM, Samuel Taylor  wrote:

> Is it possible that your log directory is in /tmp/ and your OS is deleting
> that directory? I know it's happened to me before.
>
> - Samuel
>
> On Jul 27, 2016 13:43, "David Yu"  wrote:
>
> > We are using Kafka 0.8.2.0 provided by CDH. Our Kafka retention is set to
> > default 7 days. One problem we have with one of our topics is that, the
> > logs are purged within two days. This topic does not override any default
> > settings.
> >
> > Just wanna get some pointers as to how to debug this issue.
> >
> > Thanks,
> > David
> >
>


Re: Consumer Offsets and Open FDs

2016-07-19 Thread Manikumar Reddy
Thanks for correcting me, Tom.  I got confused with warn log message.

On Tue, Jul 19, 2016 at 5:45 PM, Tom Crayford <tcrayf...@heroku.com> wrote:

> Manikumar,
>
> How will that help? Increasing the number of log cleaner threads will lead
> to *less* memory for the buffer per thread, as it's divided up among
> available threads.
>
> Lawrence, I'm reasonably sure you're hitting KAFKA-3587 here, and should
> upgrade to 0.10 ASAP. As far as I'm aware Kafka doesn't have any
> backporting or stable versions policy, so the only ways to get that patch
> are a) upgrade b) backport the patch yourself. b) seems extremely risky to
> me
>
> Thanks
>
> Tom
>
> On Tue, Jul 19, 2016 at 5:49 AM, Manikumar Reddy <
> manikumar.re...@gmail.com>
> wrote:
>
> > Try increasing log cleaner threads.
> >
> > On Tue, Jul 19, 2016 at 1:40 AM, Lawrence Weikum <lwei...@pandora.com>
> > wrote:
> >
> > > It seems that the log-cleaner is still failing no matter what settings
> I
> > > give it.
> > >
> > > Here is the full output from one of our brokers:
> > > [2016-07-18 13:00:40,726] ERROR [kafka-log-cleaner-thread-0], Error due
> > > to  (kafka.log.LogCleaner)
> > > java.lang.IllegalArgumentException: requirement failed: 192053210
> > messages
> > > in segment __consumer_offsets-15/.log but offset
> map
> > > can fit only 7499. You can increase log.cleaner.dedupe.buffer.size
> or
> > > decrease log.cleaner.threads
> > > at scala.Predef$.require(Predef.scala:219)
> > > at
> > > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> > > at
> > > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> > > at
> > >
> >
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> > > at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> > > at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> > > at
> > > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> > > at
> > kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> > > at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > > [2016-07-18 13:00:40,732] INFO [kafka-log-cleaner-thread-0], Stopped
> > > (kafka.log.LogCleaner)
> > >
> > > Currently, I have heap allocation up to 64GB, only one log-cleaning
> > thread
> > > is set to run, and log.cleaner.dedupe.buffer.size is 2GB.  I get this
> > error
> > > if I try to increase it anymore:
> > >
> > > WARN [kafka-log-cleaner-thread-0], Cannot use more than 2G of cleaner
> > > buffer space per cleaner thread, ignoring excess buffer space...
> > > (kafka.log.LogCleaner)
> > >
> > > Is there something else I can do to help the broker compact the
> > > __consumer_offset topics?
> > >
> > > Thank you again for your help!
> > >
> > > Lawrence Weikum
> > >
> > > On 7/13/16, 1:06 PM, "Rakesh Vidyadharan" <rvidyadha...@gracenote.com>
> > > wrote:
> > >
> > > We ran into this as well, and I ended up with the following that works
> > for
> > > us.
> > >
> > > log.cleaner.dedupe.buffer.size=536870912
> > > log.cleaner.io.buffer.size=2000
> > >
> > >
> > >
> > >
> > >
> > > On 13/07/2016 14:01, "Lawrence Weikum" <lwei...@pandora.com> wrote:
> > >
> > > >Apologies. Here is the full trace from a broker:
> > > >
> > > >[2016-06-24 09:57:39,881] ERROR [kafka-log-cleaner-thread-0], Error
> due
> > > to  (kafka.log.LogCleaner)
> > > >java.lang.IllegalArgumentException: requirement failed: 9730197928
> > > messages in segment __consumer_offsets-36/.log but
> > > offset map can fit only 5033164. You can increase
> > > log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
> > > >at scala.Predef$.require(Predef.scala:219)
> > > >at
> > > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> > > >at
> > > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> > > >at
> > >
> >
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> > > >at kafka.log.Cleaner.bui

Re: Consumer Offsets and Open FDs

2016-07-18 Thread Manikumar Reddy
Try increasing log cleaner threads.

On Tue, Jul 19, 2016 at 1:40 AM, Lawrence Weikum <lwei...@pandora.com>
wrote:

> It seems that the log-cleaner is still failing no matter what settings I
> give it.
>
> Here is the full output from one of our brokers:
> [2016-07-18 13:00:40,726] ERROR [kafka-log-cleaner-thread-0], Error due
> to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 192053210 messages
> in segment __consumer_offsets-15/.log but offset map
> can fit only 7499. You can increase log.cleaner.dedupe.buffer.size or
> decrease log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> at
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-07-18 13:00:40,732] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
>
> Currently, I have heap allocation up to 64GB, only one log-cleaning thread
> is set to run, and log.cleaner.dedupe.buffer.size is 2GB.  I get this error
> if I try to increase it anymore:
>
> WARN [kafka-log-cleaner-thread-0], Cannot use more than 2G of cleaner
> buffer space per cleaner thread, ignoring excess buffer space...
> (kafka.log.LogCleaner)
>
> Is there something else I can do to help the broker compact the
> __consumer_offset topics?
>
> Thank you again for your help!
>
> Lawrence Weikum
>
> On 7/13/16, 1:06 PM, "Rakesh Vidyadharan" <rvidyadha...@gracenote.com>
> wrote:
>
> We ran into this as well, and I ended up with the following that works for
> us.
>
> log.cleaner.dedupe.buffer.size=536870912
> log.cleaner.io.buffer.size=2000
>
>
>
>
>
> On 13/07/2016 14:01, "Lawrence Weikum" <lwei...@pandora.com> wrote:
>
> >Apologies. Here is the full trace from a broker:
> >
> >[2016-06-24 09:57:39,881] ERROR [kafka-log-cleaner-thread-0], Error due
> to  (kafka.log.LogCleaner)
> >java.lang.IllegalArgumentException: requirement failed: 9730197928
> messages in segment __consumer_offsets-36/.log but
> offset map can fit only 5033164. You can increase
> log.cleaner.dedupe.buffer.size or decrease log.cleaner.threads
> >at scala.Predef$.require(Predef.scala:219)
> >at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
> >at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
> >at
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> >at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
> >at kafka.log.Cleaner.clean(LogCleaner.scala:322)
> >at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
> >at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
> >at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> >[2016-06-24 09:57:39,881] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
> >
> >
> >Is log.cleaner.dedupe.buffer.size a broker setting?  What is a good
> number to set it to?
> >
> >
> >
> >Lawrence Weikum
> >
> >
> >On 7/13/16, 11:18 AM, "Manikumar Reddy" <manikumar.re...@gmail.com>
> wrote:
> >
> >Can you post the complete error stack trace?   Yes, you need to
> >restart the affected
> >brokers.
> >You can tweak log.cleaner.dedupe.buffer.size, log.cleaner.io.buffer.size
> >configs.
> >
> >Some related JIRAs:
> >
> >https://issues.apache.org/jira/browse/KAFKA-3587
> >https://issues.apache.org/jira/browse/KAFKA-3894
> >https://issues.apache.org/jira/browse/KAFKA-3915
> >
> >On Wed, Jul 13, 2016 at 10:36 PM, Lawrence Weikum <lwei...@pandora.com>
> >wrote:
> >
> >> Oh interesting. I didn’t know about that log file until now.
> >>
> >> The only error that has been populated among all brokers showing this
> >> behavior is:
> >>
> >> ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> >>
> >> Then we see many me

Re: Enabling PLAINTEXT inter broker security

2016-07-15 Thread Manikumar Reddy
Hi,

Which Kafka version you are using?
SASL/PLAIN support is available from Kafka 0.10.0.0 release onwards.


Thanks
Manikumar

On Fri, Jul 15, 2016 at 4:22 PM, cs user  wrote:

> Apologies, just to me clear, my broker settings are actually as below,
> using PLAINTEXT throughout
>
> listeners=SASL_PLAINTEXT://host.name:port
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
>
>
> On Fri, Jul 15, 2016 at 11:50 AM, cs user  wrote:
>
> > Hi All,
> >
> > I'm dipping my toes into kafka security, I'm following the guide here:
> >
> http://kafka.apache.org/documentation.html#security_sasl_plain_brokerconfig
> >  and
> http://kafka.apache.org/documentation.html#security_sasl_brokerconfig
> >
> > My jaas config file looks like:
> >
> > KafkaServer {
> > org.apache.kafka.common.security.plain.PlainLoginModule required
> > username="admin"
> > password="admin-secret"
> > user_admin="admin-secret"
> > user_alice="alice-secret";
> > };
> >
> > I pass the following to kafka on startup to load the above in:
> >
> > -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
> >
> >
> > I'm using the following for my broker settings, using PLAINTEXT
> throughout:
> >
> > listeners=SASL_PLAINTEXT://host.name:port
> > security.inter.broker.protocol=SASL_SSL
> > sasl.mechanism.inter.broker.protocol=PLAIN
> > sasl.enabled.mechanisms=PLAIN
> >
> >
> >
> > However when kafka starts up I get the following error message:
> >
> > Caused by: javax.security.auth.login.LoginException: unable to find
> > LoginModule class:
> org.apache.kafka.common.security.plain.PlainLoginModule
> >
> > Any idea why I would be getting this error?
> >
> > Thanks!
> >
>


Re: Consumer Offsets and Open FDs

2016-07-13 Thread Manikumar Reddy
Can you post the complete error stack trace?   Yes, you need to
restart the affected
brokers.
You can tweak log.cleaner.dedupe.buffer.size, log.cleaner.io.buffer.size
configs.

Some related JIRAs:

https://issues.apache.org/jira/browse/KAFKA-3587
https://issues.apache.org/jira/browse/KAFKA-3894
https://issues.apache.org/jira/browse/KAFKA-3915

On Wed, Jul 13, 2016 at 10:36 PM, Lawrence Weikum <lwei...@pandora.com>
wrote:

> Oh interesting. I didn’t know about that log file until now.
>
> The only error that has been populated among all brokers showing this
> behavior is:
>
> ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
>
> Then we see many messages like this:
>
> INFO Compaction for partition [__consumer_offsets,30] is resumed
> (kafka.log.LogCleaner)
> INFO The cleaning for partition [__consumer_offsets,30] is aborted
> (kafka.log.LogCleaner)
>
> Using Visual VM, I do not see any log-cleaner threads in those brokers.  I
> do see it in the brokers not showing this behavior though.
>
> Any idea why the LogCleaner failed?
>
> As a temporary fix, should we restart the affected brokers?
>
> Thanks again!
>
>
> Lawrence Weikum
>
> On 7/13/16, 10:34 AM, "Manikumar Reddy" <manikumar.re...@gmail.com> wrote:
>
> Hi,
>
> Are you seeing any errors in log-cleaner.log?  The log-cleaner thread can
> crash on certain errors.
>
> Thanks
> Manikumar
>
> On Wed, Jul 13, 2016 at 9:54 PM, Lawrence Weikum <lwei...@pandora.com>
> wrote:
>
> > Hello,
> >
> > We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about every
> > other week.  I’m curious if others have seen it and know of a solution.
> >
> > Setup and Scenario:
> >
> > -  Brokers initially setup with log compaction turned off
> >
> > -  After 30 days, log compaction was turned on
> >
> > -  At this time, the number of Open FDs was ~ 30K per broker.
> >
> > -  After 2 days, the __consumer_offsets topic was compacted
> > fully.  Open FDs reduced to ~5K per broker.
> >
> > -  Cluster has been under normal load for roughly 7 days.
> >
> > -  At the 7 day mark, __consumer_offsets topic seems to have
> > stopped compacting on two of the brokers, and on those brokers, the FD
> > count is up to ~25K.
> >
> >
> > We have tried rebalancing the partitions before.  The first time, the
> > destination broker had compacted the data fine and open FDs were low. The
> > second time, the destination broker kept the FDs open.
> >
> >
> > In all the broker logs, we’re seeing this messages:
> > INFO [Group Metadata Manager on Broker 8]: Removed 0 expired offsets in 0
> > milliseconds. (kafka.coordinator.GroupMetadataManager)
> >
> > There are only 4 consumers at the moment on the cluster; one topic with
> 92
> > partitions.
> >
> > Is there a reason why log compaction may stop working or why the
> > __consumer_offsets topic would start holding thousands of FDs?
> >
> > Thank you all for your help!
> >
> > Lawrence Weikum
> >
> >
>
>
>


Re: Consumer Offsets and Open FDs

2016-07-13 Thread Manikumar Reddy
Hi,

Are you seeing any errors in log-cleaner.log?  The log-cleaner thread can
crash on certain errors.

Thanks
Manikumar

On Wed, Jul 13, 2016 at 9:54 PM, Lawrence Weikum 
wrote:

> Hello,
>
> We’re seeing a strange behavior in Kafka 0.9.0.1 which occurs about every
> other week.  I’m curious if others have seen it and know of a solution.
>
> Setup and Scenario:
>
> -  Brokers initially setup with log compaction turned off
>
> -  After 30 days, log compaction was turned on
>
> -  At this time, the number of Open FDs was ~ 30K per broker.
>
> -  After 2 days, the __consumer_offsets topic was compacted
> fully.  Open FDs reduced to ~5K per broker.
>
> -  Cluster has been under normal load for roughly 7 days.
>
> -  At the 7 day mark, __consumer_offsets topic seems to have
> stopped compacting on two of the brokers, and on those brokers, the FD
> count is up to ~25K.
>
>
> We have tried rebalancing the partitions before.  The first time, the
> destination broker had compacted the data fine and open FDs were low. The
> second time, the destination broker kept the FDs open.
>
>
> In all the broker logs, we’re seeing this messages:
> INFO [Group Metadata Manager on Broker 8]: Removed 0 expired offsets in 0
> milliseconds. (kafka.coordinator.GroupMetadataManager)
>
> There are only 4 consumers at the moment on the cluster; one topic with 92
> partitions.
>
> Is there a reason why log compaction may stop working or why the
> __consumer_offsets topic would start holding thousands of FDs?
>
> Thank you all for your help!
>
> Lawrence Weikum
>
>


Fwd: consumer.subscribe(Pattern p , ..) method fails with Authorizer

2016-07-08 Thread Manikumar Reddy
Hi,

consumer.subscribe(Pattern p , ..) method implementation tries to get
metadata of all the topics.
This will throw TopicAuthorizationException on internal topics and other
unauthorized topics.
We may need to move the pattern matching to sever side.
Is this know issue?.  If not, I will raise JIRA.

logs:
[2016-07-07 22:48:06,317] WARN Error while fetching metadata with
correlation id 1 : {__consumer_offsets=TOPIC_AUTHORIZATION_FAILED}
(org.apache.kafka.clients.NetworkClient)
[2016-07-07 22:48:06,318] ERROR Unknown error when running consumer:
 (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized
to access topics: [__consumer_offsets]


Thanks,
Manikumar


Re: Log retention just for offset topic

2016-06-29 Thread Manikumar Reddy
Hi,

Kafka internally creates the offsets topic (__consumer_offsets) with
compact mode on.
>From 0.9.0.1 onwards log.cleaner.enable=true by default.  This means topics
with a
cleanup.policy=compact will now be compacted by default,

You can tweak the offset topic configuration by using below  props
offsets.topic.compression.codec
offsets.topic.num.partitions
offsets.topic.replication.factor
offsets.topic.segment.bytes
offsets.retention.minutes
offsets.retention.check.interval.ms


Thanks
Manikumar

On Thu, Jun 30, 2016 at 9:49 AM, Sathyakumar Seshachalam <
sathyakumar_seshacha...@trimble.com> wrote:

> Am little confused about how log cleaner works. My use case is that I want
> to compact just selected topics (or in my case just the internal topic
> __consumers_offsets and want to leave other topics as is).
>
> Whats the right settings/configuration for this to happen.
>
> As I understand log cleaner enable/disable is a global setting. And my
> understanding is that they will clean all logs (compact logs based on
> cleanup policy), and so all topics' clean up policy will be considered and
> hence compacted - compact being the default policy. Is this correct ?
>
> I have set all topic's retention duration to be a really exorbitantly high
> value. Does it mean __consumer_offsets wont be compacted at all ? If so,
> how to set retention time just for offset topic it being an internal topic.
>
> Regards,
> Sathya
>


Re: [DISCUSS] Java 8 as a minimum requirement

2016-06-17 Thread Manikumar Reddy
I agree with Harsha and Marcus. Many of the kafka users are still on java 7
and
some of them definitely upgrade to newer versions.  We may need to support
for a while.
We can remove the support from next major version onwards.

Thanks,
Manikumar

On Fri, Jun 17, 2016 at 2:04 PM, Marcus Gründler  wrote:

> -1
> Hi Ismael,
>
> Although I really like the Java 8 features and understand the advantages
> you
> mentioned about Java 8 migration, I would suggest to stay with Java 7 as
> a minimum requirement for a while.
>
> I think there are two aspects to consider - Kafka Server and Kafka
> clients. On
> the server part it would make sense to switch to Java 8 because you can run
> the broker independently from any enclosing runtime (no JEE server etc.)
>
> But if you change the requirement for Kafka clients, you would cut Kafka
> support for quite a lot of real world deployments that run for example on
> an IBM WebSphere JEE Server (*sigh*). Since up to today there is no
> WebSphere version that supports Java 8.
>
> And I think a split of Kafka server with Java8 and Kafka client JARs in
> Java7
> would be too complicated to maintain.
>
> So my conclusion is - stay with Java 7 for a while.
>
> Regards, Marcus
>
>
> > Am 16.06.2016 um 22:45 schrieb Ismael Juma :
> >
> > Hi all,
> >
> > I would like to start a discussion on making Java 8 a minimum requirement
> > for Kafka's next feature release (let's say Kafka 0.10.1.0 for now). This
> > is the first discussion on the topic so the idea is to understand how
> > people feel about it. If people feel it's too soon, then we can pick up
> the
> > conversation again after Kafka 0.10.1.0. If the feedback is mostly
> > positive, I will start a vote thread.
> >
> > Let's start with some dates. Java 7 hasn't received public updates since
> > April 2015[1], Java 8 was released in March 2014[2] and Java 9 is
> scheduled
> > to be released in March 2017[3].
> >
> > The first argument for dropping support for Java 7 is that the last
> public
> > release by Oracle contains a large number of known security
> > vulnerabilities. The effectiveness of Kafka's security features is
> reduced
> > if the underlying runtime is not itself secure.
> >
> > The second argument for moving to Java 8 is that it adds a number of
> > compelling features:
> >
> > * Lambda expressions and method references (particularly useful for the
> > Kafka Streams DSL)
> > * Default methods (very useful for maintaining compatibility when adding
> > methods to interfaces)
> > * java.util.stream (helpful for making collection transformations more
> > concise)
> > * Lots of improvements to java.util.concurrent (CompletableFuture,
> > DoubleAdder, DoubleAccumulator, StampedLock, LongAdder, LongAccumulator)
> > * Other nice things: SplittableRandom, Optional (and many others I have
> not
> > mentioned)
> >
> > The third argument is that it will simplify our testing matrix, we won't
> > have to test with Java 7 any longer (this is particularly useful for
> system
> > tests that take hours to run). It will also make it easier to support
> Scala
> > 2.12, which requires Java 8.
> >
> > The fourth argument is that many other open-source projects have taken
> the
> > leap already. Examples are Cassandra[4], Lucene[5], Akka[6], Hadoop 3[7],
> > Jetty[8], Eclipse[9], IntelliJ[10] and many others[11]. Even Android will
> > support Java 8 in the next version (although it will take a while before
> > most phones will use that version sadly). This reduces (but does not
> > eliminate) the chance that we would be the first project that would
> cause a
> > user to consider a Java upgrade.
> >
> > The main argument for not making the change is that a reasonable number
> of
> > users may still be using Java 7 by the time Kafka 0.10.1.0 is released.
> > More specifically, we care about the subset who would be able to upgrade
> to
> > Kafka 0.10.1.0, but would not be able to upgrade the Java version. It
> would
> > be great if we could quantify this in some way.
> >
> > What do you think?
> >
> > Ismael
> >
> > [1] https://java.com/en/download/faq/java_7.xml
> > [2] https://blogs.oracle.com/thejavatutorials/entry/jdk_8_is_released
> > [3] http://openjdk.java.net/projects/jdk9/
> > [4] https://github.com/apache/cassandra/blob/trunk/README.asc
> > [5] https://lucene.apache.org/#highlights-of-this-lucene-release-include
> > [6] http://akka.io/news/2015/09/30/akka-2.4.0-released.html
> > [7] https://issues.apache.org/jira/browse/HADOOP-11858
> > [8] https://webtide.com/jetty-9-3-features/
> > [9] http://markmail.org/message/l7s276y3xkga2eqf
> > [10]
> >
> https://intellij-support.jetbrains.com/hc/en-us/articles/206544879-Selecting-the-JDK-version-the-IDE-will-run-under
> > [11] http://markmail.org/message/l7s276y3xkga2eqf
>
> --
>
> aixigo AG - einfach. besser. beraten
> Karl-Friedrich-Straße 68, 52072 Aachen, Germany
> fon: +49 (0)241 559709-43, fax: +49 (0)241 559709-99
> eMail: 

Re: Automatic Broker Id Generation

2016-05-20 Thread Manikumar Reddy
Yes.

On Fri, May 20, 2016 at 1:44 PM, Muqtafi Akhmad <muqt...@traveloka.com>
wrote:

> Hello Manikumar,
>
> Thank you for pointing that out, so the the broker id generation is done
> only once when broker started for the first time and there is no prior
> broker id that stored in meta.properties. Am i right?
>
> Thanks
>
> On Thu, May 19, 2016 at 7:14 PM, Manikumar Reddy <
> manikumar.re...@gmail.com>
> wrote:
>
> > Auto broker id generation logic:
> > 1. If there is a user provided broker.id, then it is used and id range
> is
> > from 0 to reserved.broker.max.id
> > 2. If there is no user provided broker.id, then auto id generation
> starts
> > from reserved.broker.max.id +1
> > 3. broker.id is stored in meta.properties file under each  log directory
> > (log.dirs).
> > 4. During server restart, broker reads broker.id from meta.properties(if
> > any) and validates
> >with user provided broker.id (if any) and retains the same id.
> >
> > you can read javadocs for more details
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L632
> >
> >
> > On Thu, May 19, 2016 at 3:28 PM, Muqtafi Akhmad <muqt...@traveloka.com>
> > wrote:
> >
> > > dar Kafka users,
> > > I have two questions about automatic broker id generation when
> > > broker.id.generation.enable = true,
> > > (1) is there any documentation about how broker id generated? is it
> > > incremental id starting from 0 that limited to reserved.broker.max.id?
> > > will
> > > broker id be reusable?
> > > (2) afaik broker id should not be changed once broker has been
> > successfully
> > > started, will a broker retain the same broker id after restarted?
> > >
> > > Thank you,
> > >
> > > --
> > > Muqtafi Akhmad
> > > Software Engineer
> > > Traveloka
> > >
> >
>
>
>
> --
> Muqtafi Akhmad
> Software Engineer
> Traveloka
>


Re: [COMMERCIAL] Re: [COMMERCIAL] Re: download - 0.10.0.0 RC6

2016-05-19 Thread Manikumar Reddy
Hi,

commitId is nothing but latest git commit hash of the release. This is taken
while building binary distribution. commitId is avilable in binary release
(kafka_2.10-0.10.0.0.tgz)
commitId will not be available if you build from source release
(kafka-0.10.0.0-src.tgz).


On Wed, May 18, 2016 at 11:11 PM, Ramanan, Buvana (Nokia - US) <
buvana.rama...@nokia.com> wrote:

> OK. Went ahead with source installation.
>
> Now I see the version to be 0.10.0.0 in MBeans, but the commitID is still
> unknown.
> Is that expected?
>
> Also, I expected the RC to be available in the version information.
>
> -Original Message-
> From: Ian Wrigley [mailto:i...@confluent.io]
> Sent: Wednesday, May 18, 2016 1:32 PM
> To: users@kafka.apache.org
> Subject: [COMMERCIAL] Re: [COMMERCIAL] Re: download - 0.10.0.0 RC6
>
> [Removing dev@ to avoid cross-posting]
>
> The second and third are binaries build against two different versions of
> Scala.
>
> Ian.
>
> > On May 18, 2016, at 1:28 PM, Ramanan, Buvana (Nokia - US) <
> buvana.rama...@nokia.com> wrote:
> >
> > Ian,
> >
> > Thanks a lot for the prompt response.
> >
> > What is the difference between the following?
> >
> > 1) kafka-0.10.0.0-src.tgz
> > 2) kafka_2.10-0.10.0.0.tgz
> > 3) kafka_2.11-0.10.0.0.tgz
> >
> > I suppose the 1st one is the source and the 2nd & 3rd are binaries...
> > I am not able to figure out the difference between 2nd & 3rd, please
> clarify
> >
> > -Buvana
> >
> >
>
>


Re: Automatic Broker Id Generation

2016-05-19 Thread Manikumar Reddy
Auto broker id generation logic:
1. If there is a user provided broker.id, then it is used and id range is
from 0 to reserved.broker.max.id
2. If there is no user provided broker.id, then auto id generation starts
from reserved.broker.max.id +1
3. broker.id is stored in meta.properties file under each  log directory
(log.dirs).
4. During server restart, broker reads broker.id from meta.properties(if
any) and validates
   with user provided broker.id (if any) and retains the same id.

you can read javadocs for more details
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L632


On Thu, May 19, 2016 at 3:28 PM, Muqtafi Akhmad 
wrote:

> dar Kafka users,
> I have two questions about automatic broker id generation when
> broker.id.generation.enable = true,
> (1) is there any documentation about how broker id generated? is it
> incremental id starting from 0 that limited to reserved.broker.max.id?
> will
> broker id be reusable?
> (2) afaik broker id should not be changed once broker has been successfully
> started, will a broker retain the same broker id after restarted?
>
> Thank you,
>
> --
> Muqtafi Akhmad
> Software Engineer
> Traveloka
>


Re: client.id, v9 consumer, metrics, JMX and quotas

2016-05-11 Thread Manikumar Reddy
Hi,

This is known issue. Check below links for related discussion

https://issues.apache.org/jira/browse/KAFKA-3494
https://qnalist.com/questions/6420696/discuss-mbeans-overwritten-with-identical-clients-on-a-single-jvm


Manikumar

On Wed, May 11, 2016 at 7:29 PM, Paul Mackles  wrote:

> Hi
>
>
> I have an app that spins up multiple threads in a single JVM. Each thread
> has its own v9 consumer running under different groupIds.
>
>
> Since they are part of the same application, I set the client.id property
> for all 3 consumers to "frylock".
>
>
> Everything runs OK but I do see the following exception logged as a
> warning during startup:
>
>
> 2016-05-10 06:45:12,704 WARN [nioEventLoopGroup-3-3]
> org.apache.kafka.common.utils.AppInfoParser: Error registering AppInfo mbean
>
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=frylock
>
>
> From what I can gather, this error occurs when the consumer tries to
> register the metrics with JMX.
>
>
> I also noticed that if I don't set client.id explicitly, the API will
> generate  a unique client.id for each consumer.
>
>
> So as far as the metrics are concerned, it seems like the API wants you to
> set client.id to a unique value for all consumers running in the same JVM.
>
>
> At some point, I plan to make use of quotas. As I understand it, quotas
> are based on client.id. If so, it seems like the metrics handling in the
> consumer API is incompatible with quotas because for quotas to work, you
> would want all of the consumers that are part of the same app to use the
> same client.id (even if they are running in the same JVM).
>
>
> For my application, quotas are more important so my plan is to set
> client.id=frylock for all consumers and then write-off/ignore the per
> client-id JMX beans since they are probably not accurate. I guess I can
> still get individual consumer metrics through the Consumer.metrics() if I
> need them.
>
>
> Anyone have any better ideas? Am I missing something? Is this
> inconsistency worth filing a ticket over?
>
>
> Thanks,
>
> Paul
>
>


Re: How to work around log compaction error (0.8.2.2)

2016-04-27 Thread Manikumar Reddy
Hi,

 Are you enabling log compaction on a topic with compressed messages?
 If yes, then that might be the reason for the exception.  0.8.2.2 Log
Compaction does
 not support compressed  messages. This got fixed in 0.9.0.0 (KAFKA-1641,
KAFKA-1374)

Check below mail thread for some corrective actions
http://grokbase.com/t/kafka/users/159jbe18en/log-cleaner-thread-stops


On Thu, Apr 28, 2016 at 1:44 AM, Rakesh Vidyadharan <
rvidyadha...@gracenote.com> wrote:

> Hello,
>
> We enabled log compaction on a few topics, as we want to preserve
> permanently the latest versions of messages published to specific topics.
> After enabling compaction, the log cleaner thread dies with the same error
> for the topics we tried it on.  It looks like kafka has starting offset
> that does not exist in the topic (at least that is how I am reading the
> error).  Any ideas on how we can work around this error?
>
> Thanks
> Rakesh
>
> [2016-04-27 15:52:11,306] INFO [kafka-log-cleaner-thread-0], Starting
> (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,322] INFO Cleaner 0: Beginning cleaning of log
> metamorphosis.lineup-0. (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,323] INFO Cleaner 0: Building offset map for
> metamorphosis.lineup-0... (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,415] INFO Cleaner 0: Building offset map for log
> metamorphosis.lineup-0 for 1 segments in offset range [1553258, 2138466).
> (kafka.log.LogCleaner)
> [2016-04-27 15:52:11,435] ERROR [kafka-log-cleaner-thread-0], Error due
> to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: Last clean offset
> is 1553258 but segment base offset is 2125968 for log
> metamorphosis.lineup-0.
> at scala.Predef$.require(Predef.scala:233)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:509)
> at kafka.log.Cleaner.clean(LogCleaner.scala:307)
> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> [2016-04-27 15:52:11,436] INFO [kafka-log-cleaner-thread-0], Stopped
> (kafka.log.LogCleaner)
>
>


Re: Best Guide/link for Kafka Ops work

2016-04-21 Thread Manikumar Reddy
This book can help you:
Kafka: The Definitive Guide (
http://shop.oreilly.com/product/0636920044123.do)

On Thu, Apr 21, 2016 at 9:38 PM, Mudit Agarwal 
wrote:

> Hi,
> Any recommendations for any online guide/link on managing/Administration
> of kafka cluster.
> Thanks,Mudit


Re: Compaction does not seem to kick in

2016-04-21 Thread Manikumar Reddy
Did you set broker config property log.cleanup.policy=compact or topic
level property cleanup.policy=compact ?

On Thu, Apr 21, 2016 at 7:16 PM, Kasim Doctor  wrote:

> Hi everyone,
>
> I have a cluster of 5 brokers with Kafka 2.10_0.8.2.1 and one of the
> topics compacted (out of a total of 4 topics).
>
> In spite of repeated attempts to tune the configuration, compaction just
> does not work. The messages that I push to this compacted topic are in the
> range of 50 MB to 90 MB.
>
> Here are my broker configs related to compaction:
>
>
>
> log.cleaner.enable=true
>
>
>
> log.cleaner.backoff.ms=15000
>
>
> log.cleaner.delete.retention.ms=8640
>
>
> log.cleaner.threads=5
>
>
> log.cleaner.dedupe.buffer.size=524288000
>
>
> log.cleaner.min.cleanable.ratio=0.01
>
>
>
>
>
> Additionally, I have set segment.ms = 8640 at the topic level for the
> specific compacted topic.
>
> I also don’t see any logs related to compaction in the kafka logs.
>
> As a result, I run of out of disk space in a matter of days since I was
> relying on compaction to free up space by deleting old entries in the log
> with the same key.
>
> Can someone please help me out here ?
>
> Thanks,
> Kasim
>


Re: Metrics for Log Compaction

2016-04-15 Thread Manikumar Reddy
Hi,

log compaction related JMX metric object names are given below.

kafka.log:type=LogCleaner,name=cleaner-recopy-percent
kafka.log:type=LogCleaner,name=max-buffer-utilization-percent
kafka.log:type=LogCleaner,name=max-clean-time-secs
kafka.log:type=LogCleanerManager,name=max-dirty-percent


After every compaction cycle, we also print some useful statistics to
 logs/log-cleaner.log file.


On Wed, Apr 13, 2016 at 7:16 PM, Kasim Doctor  wrote:

> Hi everyone,
>
> We are starting to use log compaction for one of our topics and I was
> wondering if there are any specific metrics exposed to monitor how often
> compaction took place and/or how many records (with some metadata related
> to partition) were deleted ?
>
> I looked at JMX metrics that Kafka exposes but could not find specific
> metrics related to compaction.
>
> Any insight or help would be appreciated.
>
> Thanks,
> Kasim
>
>
>


Re: Metrics for Log Compaction

2016-04-15 Thread Manikumar Reddy
Hi,


kafka.log:type=LogCleaner,name=cleaner-recopy-percent
kafka.log:type=LogCleanerManager,name=max-dirty-percent
kafka.log:type=LogCleaner,name=max-clean-time-secs



After every compaction cycle, we also print some useful statistics to
 logs/log-cleaner.log file.


On Wed, Apr 13, 2016 at 7:16 PM, Kasim Doctor  wrote:

> Hi everyone,
>
> We are starting to use log compaction for one of our topics and I was
> wondering if there are any specific metrics exposed to monitor how often
> compaction took place and/or how many records (with some metadata related
> to partition) were deleted ?
>
> I looked at JMX metrics that Kafka exposes but could not find specific
> metrics related to compaction.
>
> Any insight or help would be appreciated.
>
> Thanks,
> Kasim
>
>
>


Re: Control the amount of messages batched up by KafkaConsumer.poll()

2016-04-12 Thread Manikumar Reddy
Yes.


Manikumar

On Tue, Apr 12, 2016 at 6:59 PM, Oleg Zhurakousky <
ozhurakou...@hortonworks.com> wrote:

> Thanks Manikumar
>
> So, but for now I guess the only way to at least influence it (as I’ve
> observed) would be ‘max.partition.fetch.bytes’, correct?
>
> Cheers
> Oleg
> > On Apr 12, 2016, at 9:22 AM, Manikumar Reddy <manikumar.re...@gmail.com>
> wrote:
> >
> > New consumer config property "max.poll.records"  is getting  introduced
> in
> > upcoming 0.10 release.
> > This property can be used to control the no. of records in each poll.
> >
> >
> > Manikumar
> >
> > On Tue, Apr 12, 2016 at 6:26 PM, Oleg Zhurakousky <
> > ozhurakou...@hortonworks.com> wrote:
> >
> >> Is there a way to specify in KafkaConsumer up to how many messages do I
> >> want o receive? I am operation under premise that Consumer.poll(..)
> returns
> >> a batch of messages, but not sure if there is a way to control batch
> amount.
> >>
> >> Cheers
> >> Oleg
> >>
>
>


Re: Control the amount of messages batched up by KafkaConsumer.poll()

2016-04-12 Thread Manikumar Reddy
New consumer config property "max.poll.records"  is getting  introduced in
upcoming 0.10 release.
This property can be used to control the no. of records in each poll.


Manikumar

On Tue, Apr 12, 2016 at 6:26 PM, Oleg Zhurakousky <
ozhurakou...@hortonworks.com> wrote:

> Is there a way to specify in KafkaConsumer up to how many messages do I
> want o receive? I am operation under premise that Consumer.poll(..) returns
> a batch of messages, but not sure if there is a way to control batch amount.
>
> Cheers
> Oleg
>


Re: KafkaProducer Retries in .9.0.1

2016-04-05 Thread Manikumar Reddy
Hi,

 Producer message size validation checks ("buffer.memory",
"max.request.size" )  happens before
 batching and sending messages.  Retry mechanism is applicable for broker
side errors and network errors.
Try changing "message.max.bytes" broker config property for simulating
broker side error.






On Wed, Apr 6, 2016 at 9:53 AM, christopher palm  wrote:

> Hi All,
>
> I am working with the KafkaProducer using the properties below,
> so that the producer keeps trying to send upon failure on Kafka .9.0.1.
> I am forcing a failure by setting my buffersize smaller than my
> payload,which causes the expected exception below.
>
> I don't see the producer retry to send on receiving this failure.
>
> Am I  missing something in the configuration to allow the producer to retry
> on failed sends?
>
> Thanks,
> Chris
>
> .java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 8027
> bytes when serialized which is larger than the total memory buffer you have
> configured with the buffer.memory configuration.
>
>  props.put("bootstrap.servers", bootStrapServers);
>
> props.put("acks", "all");
>
> props.put("retries", 3);//Try for 3 strikes
>
> props.put("batch.size", batchSize);//Need to see if this number should
> increase under load
>
> props.put("linger.ms", 1);//After 1 ms fire the batch even if the batch
> isn't full.
>
> props.put("buffer.memory", buffMemorySize);
>
> props.put("max.block.ms",500);
>
> props.put("max.in.flight.requests.per.connection", 1);
>
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>


Re: consumer too fast

2016-03-31 Thread Manikumar Reddy
Hi,

1. New config property "max.poll.records"  is getting  introduced in
upcoming 0.10 release.
   This property can be used to control the no. of records in each poll.

2.  We can use the combination of ExecutorService/Processing Thread and
Pause/Resume API to handle unwanted rebalances.

Some of these options are discussed here
http://users.kafka.apache.narkive.com/4vvhuBZO/low-latency-high-message-size-variance

Example code is here
https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java

On Thu, Mar 31, 2016 at 3:43 PM, Daniel Fanjul <
daniel.fanjul.alcu...@gmail.com> wrote:

> Hi all,
>
> My problem: If the consumer fetches too much data and the processing of the
> records is not fast enough, commit() fails because there was a rebalance.
>
> I cannot reduce 'max.partition.fetch.bytes' because there might be large
> messages.
>
> I don't want to increase the 'session.timeout.ms', because it would be too
> large to detect failures.
>
> I understand that the new consumer API only sends the heartbeats and
> manages rebalances during the call to poll(). But if I call poll(0), there
> is still a chance it will return even more data. So I keep the heart beats,
> but I may accumulate too much data, eventually leading to OOM.
>
> I would like something:
> foreach record in consumer.poll() {
>   process(record)
>   consumer.doHeartBeatsAndRebalanceSoKeepMeStillAlive()
> }
>
> Is this possible?
>


Re: Is it safe to send messages to Kafka when one of the brokers is down?

2016-03-28 Thread Manikumar Reddy
Hi,

 1.  Your topic partitions are not replicated (replication factor =1).
Increase replication factor for better fault tolerance.
  With proper replication, Kafka Brokers/Producers can handle node
failures  without data loss.

 2.  Looks like Kafka brokers are not in a cluster.  They might be
configured with different Zookeeper clusters.  All Kafka servers should be
configured with same zookeeper cluster. Check your ZK cluster.

On Tue, Mar 29, 2016 at 4:57 AM, Eric Hyunwoo Na  wrote:

> My company has been running Kafka on a three-node cluster. Let us call the
> nodes master, slave1, slave2.
>
> All three nodes are running Kafka.
>
> However, I found out I/O is screwed up in the mounting partition (`/mnt/`)
> on the master broker, and even the root cannot read, write, or execute any
> of the file there. It is strange how Kafka is still running.
>
> The other two brokers are fine, but I think only one of them is actually
> functioning.
>
> I want to replace the corrupted disk on the master, and then re-enable
> Kafka on the master.
>
> From my understanding, when I kill Kafka on the master, one of the
> followers will elect itself as a leader, and it should work fine.
>
> My concern is,
>
> 1. Keeping sending messages to master when Kafka is off might mess up the
> master server. (I pass a comma separated list of all three brokers in the
> consumer, but I need to make sure it's safe)
>
> 2. The cluster might be poorly configured so that it's not a three-node
> cluster, but actually three one-node cluster, and mater would not be
> fault-tolerant.
>
>  For example, on slave 1,
>
> $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic
> metric.topic
> Topic:metric.topic PartitionCount:1 ReplicationFactor:1 Configs:
> Topic: metric.topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
>
>   on slave 2,
>
> Topic:metric.topic PartitionCount:1 ReplicationFactor:1 Configs:
>   Topic: metric.topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>
>   (I cannot check this for master, because of I/O permission is messed up
> there)
>
>   These two seem to run separately, although they receive the same messages
> from the producers.
>
> How can I make sure these two things would not happen?
>
> Especially, where in the Kafka documentation are they addressing my concern
> #1?
>
> --
> Best,
>
> Eric
>


Re: Queue implementation

2016-03-28 Thread Manikumar Reddy
Yes. your scenarios are easy to implement using Kafka. Pl go through Kafka
documentation and examples for better
understanding of Kafka concepts, use cases and design.

https://kafka.apache.org/documentation.html
https://github.com/apache/kafka/tree/trunk/examples

On Tue, Mar 29, 2016 at 9:20 AM, Vinod Kakad  wrote:

> Can we have multiple topics and multiple consumers but in a queue
> implementation.
>
> for e.g.
> There are 2 topics T1 and T2 and four consumers C1,C2,C3 and C4.
>
> can we have C1,C2,C3 subscribe to T1 and (C3,C4) subscribe to T2.  and this
> should work like a queue. i.e. T2 should send message to only C3 or C4. and
> same in case of T1.
>
> is this possible by any means?
>
> Thanks & Regards,
> Vinod Kakad.
>
> On Mon, Mar 28, 2016 at 8:16 PM, Gaurav Agarwal 
> wrote:
>
> > If you have consumer group id across multiple consumers then Kafka will
> > work as queue only .
> > On Mar 28, 2016 6:48 PM, "Sharninder"  wrote:
> >
> > > What kind of queue are you looking for? Kafka works as a nice FIFO
> queue
> > by
> > > default anyway.
> > >
> > >
> > >
> > > On Mon, Mar 28, 2016 at 5:19 PM, Vinod Kakad 
> > wrote:
> > >
> > > > Can anybody share any good example(code) for kafka as a queue
> > > > implementation?
> > > >
> > > > Thanks & Regards,
> > > > Vinod Kakad.
> > > >
> > >
> > >
> > >
> > > --
> > > --
> > > Sharninder
> > >
> >
>


Re: Custom serializer/deserializer for kafka 0.9.x version

2016-03-28 Thread Manikumar Reddy
Hi,

You need to implement org.apache.kafka.common.serialization.Serializer,
org.apache.kafka.common.serialization.Deserializer
interfaces. Encoder, Decoder interfaces are for older clients.

Example code:
  https://github.com/omkreddy/kafka-example
s/tree/master/consumer/src/main/java/kafka/examples/common/serialization

On Tue, Mar 29, 2016 at 8:24 AM, Ratha v  wrote:

> Hi all;
>
> I have written my custom serialiser/deserializer to publish/consume my java
> bean objects.
> However i get class nor found exception for "kafka.serializer" packages.
> Can someone point me which class i have to use to implement my
> custom serializer in kafka 0.9.x?
>
> import kafka.serializer.Decoder;
> import kafka.serializer.Encoder;
> import kafka.utils.VerifiableProperties;
> public class FileSerializer implements Encoder, Decoder {
>
> public FileSerializer() {
>
> }
>
> public RawFileSerializer(VerifiableProperties verifiableProperties) {
> /* This constructor must be present for successful compile. */
> }
>
> @Override
> public byte[] toBytes(File file) {
> 
>
> }
>
> @Override
> public File fromBytes(byte[] fileContent) {
> 
> return (File) obj;
> }
>
> }
>
>
> I have added following pom dependency.
>
> 
>
> org.apache.kafka
>
> kafka-clients
>
> 0.9.0.0
>
> 
>
>
> Thanks
> --
> -Ratha
> http://vvratha.blogspot.com/
>


Re: Multiple Topics and Consumer Groups

2016-03-27 Thread Manikumar Reddy
A consumer can belong to only one consumer group.
https://kafka.apache.org/documentation.html#intro_consumers

On Mon, Mar 28, 2016 at 11:01 AM, Vinod Kakad  wrote:

> Hi,
>
> I wanted to know if same consumer can be in two consumer groups.
>
> OR
>
> How the multiple topic subscription for consumer group works.
>
>
> Thanks & Regards,
> Vinod Kakad.
>


Re: Offset after message deletion

2016-03-27 Thread Manikumar Reddy
It will continue from the latest offset. offset is a increasing, contiguous
sequence number per partition.

On Mon, Mar 28, 2016 at 9:11 AM, Imre Nagi  wrote:

> Hi All,
>
> I'm new in kafka. So, I have a question related to kafka offset.
>
> From the kafka documentation in here
> , it said :
>
> > The Kafka cluster retains all published messages—whether or not they have
> > been consumed—for a configurable period of time. For example if the log
> > retention is set to two days, then for the two days after a message is
> > published it is available for consumption, after which it will be
> discarded
> > to free up space.
> >
> After the message has been discarded, let say that we receive some new
> message. What is the offset of the new message? Will it restart from 0 or
> continue the latest offset?
>
> Thanks,
> Imre
>


Re: Re: Topics in Kafka

2016-03-23 Thread Manikumar Reddy
Hi,

you can have single topic with multiple partitions. It looks like you have
messages with key=region and value=sensor reading, and you want to run some
aggregate, windowing operations by time. Kafka Streams is perfect fit for
this use case.



On Wed, Mar 23, 2016 at 2:30 PM, Maria Musterfrau <daniela_4...@gmx.at>
wrote:

> Hi,
>
> Thank you for your reply.
>
> I would like to cluster the values (every message contains one value)
> after their region and to add the values up in realtime afterwards (the
> actual/last minute).
>
> I would like to use Kafka to subscribe the messages to a stream processing
> framework like Storm or Spark Streaming and to use a time window because I
> would like to add up the values of the last minute.
>
> Would it be better to use different topics for different regions or to use
> one topic and to do the clustering in Storm or Spark Streaming afterwards?
>
> Thank you in advance.
>
> Regards,
> Daniela
>
>
>
> Gesendet: Mittwoch, 23. März 2016 um 09:42 Uhr
> Von: "Manikumar Reddy" <ku...@nmsworks.co.in>
> An: "users@kafka.apache.org" <users@kafka.apache.org>
> Betreff: Re: Topics in Kafka
> Hi,
>
> 1. Based on your design, it can be one or more topics. You can design one
> topic per region or
> one topic for all region devices.
>
> 2. Yes, you need to listen to web socket messages and write to kafka
> server using kafka producer.
> In your use case, you can also send messages using Kafka Rest Proxy
> from confluent.
> http://docs.confluent.io/2.0.1/kafka-rest/docs/index.html
>
>
> On Wed, Mar 23, 2016 at 1:59 PM, Maria Musterfrau <daniela_4...@gmx.at>
> wrote:
>
> > Hi
> >
> > I am new and I have a question regarding Kafka. I have devices in
> > different regions. Every device opens a websocket connection when it gets
> > active and it sends its messages over the opened websocket connection to
> a
> > server. My question is: is every region a topic or is every websocket
> > connection (means every device) a topic? Or can I choose which one is
> used
> > as topic?
> >
> > Is it true that I need a piece of code as Kafka cannot consume messages
> > directly from the websocket connection?
> >
> > Thank you and regards,
> > Daniela
> >
>


Re: Topics in Kafka

2016-03-23 Thread Manikumar Reddy
Hi,

1. Based on your design, it can be one or more topics. You can design one
topic per region or
   one topic for all region devices.

2. Yes, you need to listen to web socket messages  and write to kafka
server using kafka producer.
In your use case, you can also send messages using Kafka Rest Proxy
from confluent.
http://docs.confluent.io/2.0.1/kafka-rest/docs/index.html


On Wed, Mar 23, 2016 at 1:59 PM, Maria Musterfrau 
wrote:

> Hi
>
> I am new and I have a question regarding Kafka. I have devices in
> different regions. Every device opens a websocket connection when it gets
> active and it sends its messages over the opened websocket connection to a
> server. My question is: is every region a topic or is every websocket
> connection (means every device) a topic? Or can I choose which one is used
> as topic?
>
> Is it true that I need a piece of code as Kafka cannot consume messages
> directly from the websocket connection?
>
> Thank you and regards,
> Daniela
>


Re: Reading data from sensors

2016-03-23 Thread Manikumar Reddy
Hi,

you can use librdkafka C library for producing data.

https://github.com/edenhill/librdkafka


Manikumar

On Wed, Mar 23, 2016 at 12:41 PM, Shashidhar Rao  wrote:

> Hi,
>
> Can someone help me with reading data from sensors and storing into Kafka.
>
> At the moment the sensors data are read by a C program and the captured
> data is stored in Oracle.
>
> How data captured by C program at real time can be stored into Kafka. Is
> there a sample C Kafka producer which stores data into kafka.
>
> Thanks in advance.
>
> Regards
> Shashi
>


Re: Reg : Unable to produce message

2016-03-20 Thread Manikumar Reddy
We may get few warning exceptions, on first produce to unknown topic , with
default server config property auto.create.topics.enable = true. If this is
the case, then it is harmless exception.

On Sun, Mar 20, 2016 at 11:19 AM, Mohamed Ashiq 
wrote:

> All,
>
> I am getting this error for few topics
> WARN Error while fetching metadata with correlation id 4802 :
> {next=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
> WARN Error while fetching metadata with correlation id 4803 :
> {next=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>
> I am using Kafka 0.9.0.0 and getting this error for few topics while
> trying to produce messages. After googling I got answers to delete those
> topics and to recreate.
> But I can’t take that option since I would like to have the data and the
> state of available.
>
> We got this problem in our integration environment. We are very close to
> moving to production. It would be great if someone can help to understand
> this problem better.
>
> Regards,
> Ashiq.
>
>
>


Re: Larger Size Error Message

2016-03-19 Thread Manikumar Reddy
DumpLogSegments tool is used to dump partition data logs (not application
logs).

Usage:
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/tmp/kafka-logs/TEST-TOPIC-0/.log

Use  --key-decoder-class , --key-decoder-class options to pass
deserializers.

On Fri, Mar 18, 2016 at 12:17 PM, Fang Wong  wrote:

> Thanks Guozhang:
>
> I put server.log in the command line, got the the following error:
>
> -bash-4.1$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /home/kafka/logs/server.log
> Dumping /home/sfdc/logs/liveAgent/kafka/logs/server.log
> Exception in thread "main" java.lang.NumberFormatException: For input
> string: "server"
> at
>
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Long.parseLong(Long.java:589)
> at java.lang.Long.parseLong(Long.java:631)
> at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230)
> at scala.collection.immutable.StringOps.toLong(StringOps.scala:31)
> at
>
> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpLog(DumpLogSegments.scala:135)
> at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:77)
> at
> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
> at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
>
> Also how to pass the right deserializers?
>
> Thanks,
> Fang
>
> On Wed, Mar 16, 2016 at 4:15 PM, Guozhang Wang  wrote:
>
> > Fang,
> >
> > You can use the kafka.tools.DumpLogSegments to scan and view the logs,
> but
> > you need the right deserializers to illustrate the content.
> >
> > Guozhang
> >
> > On Wed, Mar 16, 2016 at 4:03 PM, Fang Wong  wrote:
> >
> > > Thanks Guozhang!
> > > We are in the process of upgrading to 0.9.0.0. We will look into using
> > > ACLs.
> > >
> > > Is there a way to see what is the request in the kafka server, the
> > request
> > > for my case is byte[]? Is there a way to turn on kafka logging to see
> the
> > > request on the kafka server side?
> > >
> > > Thanks,
> > > Fang
> > >
> > > On Wed, Mar 16, 2016 at 12:03 PM, Guozhang Wang 
> > > wrote:
> > >
> > > > Before 0.9 and before anyone knows your server host / port can
> produce
> > > > request to you unless you have a hardware LB or firewall.
> > > >
> > > > In the recent release of 0.9, there is a Security feature added to
> > Kafka,
> > > > including encryption / authentication / authorization. For your
> case, I
> > > > would suggest you upgrade to 0.9 and use its authorization mechanism
> > > using
> > > > ACLs.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Mar 16, 2016 at 11:36 AM, Fang Wong 
> > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > The problem is that server "10.225.36.226" is not one of my kafka
> > > > clients,
> > > > > nslookup shows it is another internal server, my servers are like
> > > > > 10.224.146.6 #, I can't even login to
> > that
> > > > > server. All of my messages are at most a few KB.
> > > > >
> > > > > Is it possible anybody within the internal network can send any
> > message
> > > > to
> > > > > kafka? How do I allow a list of fixed servers can send a request to
> > > kafka
> > > > > server?
> > > > >
> > > > > Thanks,
> > > > > Fang
> > > > >
> > > > >
> > > > > On Tue, Mar 15, 2016 at 5:31 PM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Fang,
> > > > > >
> > > > > > From the logs you showed above there is a single produce request
> > with
> > > > > very
> > > > > > large request size:
> > > > > >
> > > > > > "[2016-03-14 06:43:03,579] INFO Closing socket connection to
> > > > > > /10.225.36.226 due to invalid request: Request of length
> > *808124929*
> > > > is
> > > > > > not valid, it is larger than the maximum size of 104857600 bytes.
> > > > > > (kafka.network.Processor)"
> > > > > >
> > > > > > Which is about 770MB while the maximum request size is configured
> > as
> > > > > 100MB.
> > > > > > It is from the client hosted at "10.225.36.226", if you can go to
> > > that
> > > > > > server and checks the producer logs around that time, maybe you
> can
> > > > > > discover why there comes a single big produce request.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 14, 2016 at 1:59 PM, Fang Wong  >
> > > > wrote:
> > > > > >
> > > > > > > After changing log level from INFO to TRACE, here is kafka
> > > > server.log:
> > > > > > >
> > > > > > > [2016-03-14 06:43:03,568] TRACE 156 bytes written.
> > > > > > > (kafka.network.BoundedByteBufferSend)
> > > > > > >
> > > > > > > [2016-03-14 06:43:03,575] TRACE 

Re: Larger Size Error Message

2016-03-19 Thread Manikumar Reddy
DumpLogSegments tool is used to dump partition data logs (not application
logs).

Usage:
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/tmp/kafka-logs/TEST-TOPIC-0/.log

Use  --key-decoder-class , --value-decoder-class options to pass
deserializers.

On Fri, Mar 18, 2016 at 12:31 PM, Manikumar Reddy <ku...@nmsworks.co.in>
wrote:

> DumpLogSegments tool is used to dump partition data logs (not application
> logs).
>
> Usage:
> ./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /tmp/kafka-logs/TEST-TOPIC-0/.log
>
> Use  --key-decoder-class , --key-decoder-class options to pass
> deserializers.
>
> On Fri, Mar 18, 2016 at 12:17 PM, Fang Wong <fw...@salesforce.com> wrote:
>
>> Thanks Guozhang:
>>
>> I put server.log in the command line, got the the following error:
>>
>> -bash-4.1$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --files
>> /home/kafka/logs/server.log
>> Dumping /home/sfdc/logs/liveAgent/kafka/logs/server.log
>> Exception in thread "main" java.lang.NumberFormatException: For input
>> string: "server"
>> at
>>
>> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
>> at java.lang.Long.parseLong(Long.java:589)
>> at java.lang.Long.parseLong(Long.java:631)
>> at
>> scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230)
>> at scala.collection.immutable.StringOps.toLong(StringOps.scala:31)
>> at
>>
>> kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpLog(DumpLogSegments.scala:135)
>> at
>>
>> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:77)
>> at
>>
>> kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:73)
>> at
>>
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:73)
>> at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)
>>
>> Also how to pass the right deserializers?
>>
>> Thanks,
>> Fang
>>
>> On Wed, Mar 16, 2016 at 4:15 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> > Fang,
>> >
>> > You can use the kafka.tools.DumpLogSegments to scan and view the logs,
>> but
>> > you need the right deserializers to illustrate the content.
>> >
>> > Guozhang
>> >
>> > On Wed, Mar 16, 2016 at 4:03 PM, Fang Wong <fw...@salesforce.com>
>> wrote:
>> >
>> > > Thanks Guozhang!
>> > > We are in the process of upgrading to 0.9.0.0. We will look into using
>> > > ACLs.
>> > >
>> > > Is there a way to see what is the request in the kafka server, the
>> > request
>> > > for my case is byte[]? Is there a way to turn on kafka logging to see
>> the
>> > > request on the kafka server side?
>> > >
>> > > Thanks,
>> > > Fang
>> > >
>> > > On Wed, Mar 16, 2016 at 12:03 PM, Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > >
>> > > > Before 0.9 and before anyone knows your server host / port can
>> produce
>> > > > request to you unless you have a hardware LB or firewall.
>> > > >
>> > > > In the recent release of 0.9, there is a Security feature added to
>> > Kafka,
>> > > > including encryption / authentication / authorization. For your
>> case, I
>> > > > would suggest you upgrade to 0.9 and use its authorization mechanism
>> > > using
>> > > > ACLs.
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Wed, Mar 16, 2016 at 11:36 AM, Fang Wong <fw...@salesforce.com>
>> > > wrote:
>> > > >
>> > > > > Hi Guozhang,
>> > > > >
>> > > > > The problem is that server "10.225.36.226" is not one of my kafka
>> > > > clients,
>> > > > > nslookup shows it is another internal server, my servers are like
>> > > > > 10.224.146.6 <http://10.224.146.63:9092/>#, I can't even login to
>> > that
>> > > > > server. All of my messages are at most a few KB.
>> > > > >
>> > > > > Is it possible anybody within the internal network can send any
>> > message
>

Re: Kafka 0.8.1.1 keeps full GC

2016-03-13 Thread Manikumar Reddy
Hi,

These logs are minor GC logs and they look normal. Look for the word 'Full'
 for full gc log  details.




On Sun, Mar 13, 2016 at 3:06 PM, li jinyu  wrote:

> I'm using Kafka 0.8.1.1, have 10 nodes in a cluster, all are started with
> default command:
> ./bin/kafka-server-start.sh conf/server.properties
>
> but yesterday, I found that two nodes kept full GC(every 1-3 seconds),
> although there was still enough memory:
>
> 99546.435: [GC [1 CMS-initial-mark: 532714K(699072K)] 538916K(1013632K),
> 0.0099270 secs] [Times: user=0.01 sys=0.00, real=0.01 secs]
> 99546.446: [CMS-concurrent-mark-start]
> 99546.520: [CMS-concurrent-mark: 0.074/0.074 secs] [Times: user=0.21
> sys=0.03, real=0.08 secs]
> 99546.520: [CMS-concurrent-preclean-start]
> 99546.525: [CMS-concurrent-preclean: 0.005/0.005 secs] [Times: user=0.01
> sys=0.00, real=0.00 secs]
> 99546.525: [CMS-concurrent-abortable-preclean-start]
> 99547.348: [CMS-concurrent-abortable-preclean: 0.822/0.823 secs] [Times:
> user=1.53 sys=0.31, real=0.83 secs]
> 99547.350: [GC[YG occupancy: 158101 K (314560
> K)]2016-03-12T20:19:58.597+0800: 99547.350: [GC 99547.350: [ParNew:
> 158101K->5170K(314560K), 0.0189700 secs] 690816K->538498K(1013632K),
> 0.0190720 secs] [Times: user=0.11 sys=0.00, real=0.02 secs]
> 99547.369: [Rescan (parallel) , 0.0099240 secs]99547.379: [weak refs
> processing, 0.090 secs]99547.379: [class unloading, 0.0028860
> secs]99547.382: [scrub symbol table, 0.0015400 secs]99547.383: [scrub
> string table, 0.0001640 secs] [1 CMS-remark: 533327K(699072K)]
> 538498K(1013632K), 0.0353890 secs] [Times: user=0.18 sys=0.00, real=0.03
> secs]
> 99547.386: [CMS-concurrent-sweep-start]
> 99547.421: [CMS-concurrent-sweep: 0.035/0.035 secs] [Times: user=0.08
> sys=0.01, real=0.04 secs]
> 99547.421: [CMS-concurrent-reset-start]
> 99547.426: [CMS-concurrent-reset: 0.005/0.005 secs] [Times: user=0.01
> sys=0.01, real=0.00 secs]
> 99549.132: [GC 99549.132: [ParNew: 284786K->4184K(314560K), 0.0158450 secs]
> 816880K->536891K(1013632K), 0.0159570 secs] [Times: user=0.08 sys=0.00,
> real=0.01 secs]
>
>
> is this caused by small head(1G by default), or consumer lag?
>
> I tried to use jmap to check the heap, but failed to attach to the java
> process.
> how to find the root cause?
>
> --
> Don't schedule every day, make them disorder.
>


Re: Kafka 0.9.0.1 broker 0.9 consumer location of consumer group data

2016-03-09 Thread Manikumar Reddy
We need to pass "--new-consumer" property to kafka-consumer-groups.sh
command to use new consumer.

sh kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --list
 --new-consumer


On Thu, Mar 10, 2016 at 12:02 PM, Rajiv Kurian  wrote:

> Hi Guozhang,
>
> I tried using the kafka-consumer-groups.sh --list command and it says I
> have no consumer groups set up at all. Yet I am receiving data on 19 out of
> 20 consumer processes (each with their own topic and consumer group).
>
> Here is my full kafka config as printed when my process started up:
>
> metric.reporters = []
>
> metadata.max.age.ms = 30
>
> value.deserializer = class
> sf.org.apache.kafka9.common.serialization.ByteArrayDeserializer
>
> group.id = myTopic_consumer
>
> partition.assignment.strategy =
> [sf.org.apache.kafka9.clients.consumer.RangeAssignor]
>
> reconnect.backoff.ms = 50
>
> sasl.kerberos.ticket.renew.window.factor = 0.8
>
> max.partition.fetch.bytes = 1048576
>
> bootstrap.servers = [myBroker1:9092, myBroker2:9092,
> myBroker3:9092]
>
> retry.backoff.ms = 100
>
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
> sasl.kerberos.service.name = null
>
> sasl.kerberos.ticket.renew.jitter = 0.05
>
> ssl.keystore.type = JKS
>
> ssl.trustmanager.algorithm = PKIX
>
> enable.auto.commit = false
>
> ssl.key.password = null
>
> fetch.max.wait.ms = 1000
>
> sasl.kerberos.min.time.before.relogin = 6
>
> connections.max.idle.ms = 54
>
> ssl.truststore.password = null
>
> session.timeout.ms = 3
>
> metrics.num.samples = 2
>
> client.id =
>
> ssl.endpoint.identification.algorithm = null
>
> key.deserializer = class sf.disco.kafka.VoidDeserializer
>
> ssl.protocol = TLS
>
> check.crcs = true
>
> request.timeout.ms = 4
>
> ssl.provider = null
>
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
> ssl.keystore.location = null
>
> heartbeat.interval.ms = 3000
>
> auto.commit.interval.ms = 5000
>
> receive.buffer.bytes = 32768
>
> ssl.cipher.suites = null
>
> ssl.truststore.type = JKS
>
> security.protocol = PLAINTEXT
>
> ssl.truststore.location = null
>
> ssl.keystore.password = null
>
> ssl.keymanager.algorithm = SunX509
>
> metrics.sample.window.ms = 3
>
> fetch.min.bytes = 256
>
> send.buffer.bytes = 131072
>
> auto.offset.reset = earliest
>
> It prints out the group.id field as myTopic_consumer. I was expecting to
> get this in the --list command and yet I am not getting it. Is this the
> name of the consumer group or am I missing something?
>
> I use the subscribe call on the consumer and my understanding was that the
> subscribe call would do all the work needed to create/join a group. Given I
> have a single consumer per group and a single group per topic I'd expect to
> see 20 groups (1 for each of my topics). However the --list returns no
> groups at all!
>
> Thanks,
> Rajiv
>
> On Wed, Mar 9, 2016 at 8:22 PM, Guozhang Wang  wrote:
>
> > Rajiv,
> >
> > In the new Java consumer you used, the ZK dependency has been removed and
> > hence you wont see any data from ZK path.
> >
> > To check the group metadata you can use the ConsumerGroupCommand, wrapped
> > in bin/kafka-consumer-groups.sh.
> >
> > Guozhang
> >
> > On Wed, Mar 9, 2016 at 5:48 PM, Rajiv Kurian  wrote:
> >
> > > Don't think I made my questions clear:
> > >
> > > On Kafka 0.9.0.1 broker and 0.9 consumer how do I tell what my
> > > consumer-groups are? Can I still get this information in ZK? I don't
> see
> > > anything in the consumers folder which is alarming to me. This is
> > > especially alarming because I do see that 8 partitions are assigned on
> > the
> > > consumer (via jmx). I specify the consumer group using:
> > >
> > > String myConsumerGroupId = myTopic + "_consumer";
> > >
> > >
> >
> props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
> > >  myConsumerGroupId);
> > >
> > > I am running with this setup on about 20 consumers (each consuming a
> > unique
> > > topic) and I only see one of my consumers not passing any messages to
> my
> > > application even though I see that the jmx console says it is
> receiving 5
> > > requests per second. The other 19 seem to be working fine.
> > >
> > > Each of these 20 topics was created when a message was sent to it i.e.
> it
> > > was not provisioned from before. Messages currently are only being sent
> > to
> > > partition 0 even though there are 8 partitions per topic.
> > >
> > >
> > > Thanks,
> > >
> > > Rajiv
> > >
> > > On Wed, Mar 9, 2016 at 4:30 PM, Rajiv Kurian 
> wrote:
> > >
> > > > Also forgot to mention that when I do consume with the console
> 

Re: Regarding issue in Kafka-0.8.2.2.3

2016-02-08 Thread Manikumar Reddy
kafka scripts uses "kafka-run-class.sh" script to set environment variables
and run classes. So if you set any environment variable
in"kafka-run-class.sh" script, then it will be applicable to all the
scripts. So try to set different JMX_PORT in kafka-topics.sh.

On Mon, Feb 8, 2016 at 9:24 PM, Shishir Shivhare 
wrote:

> Hi,
>
> In order to get metrics through JMX, we have exported JMX_PORT=8099. But
> when we are trying to delete the topics from kafka, we are getting
> following issue:
>
> Error: Exception thrown by the agent : java.rmi.server.ExportException:
> Port already in use: 8099; nested exception is:
> java.net.BindException: Address already in use
>
>
> When we do not export JMX_PORT then it works fine i.e. we are able to
> delete topic. So my assumption is that when JMX_PORT is exported by us to
> get JMX metrics, this variable is also used by delete process while
> deleting the topic which gives error as this port is already used.
> Can you please help us regarding this?
>
>
>
> Thanks & Regards,
> Shishir Shivhare
>
> On 8 February 2016 at 21:23, Shishir Shivhare 
> wrote:
>
> > Hi,
> >
> > In order to get metrics through JMX, we have exported JMX_PORT=8099. But
> > when we are trying to delete the topics from kafka, we are getting
> > following issue:
> >
> > Error: Exception thrown by the agent : java.rmi.server.ExportException:
> > Port already in use: 1234; nested exception is:
> > java.net.BindException: Address already in use
> >
> >
> > When we do not export JMX_PORT then it works fine i.e. we are able to
> > delete topic. So my assumption is that when JMX_PORT is exported by us to
> > get JMX metrics, this variable is also used by delete process while
> > deleting the topic which gives error as this port is already used.
> > Can you please help us regarding this?
> >
> >
> >
> > Thanks & Regards,
> > Shishir Shivhare
> >
>


Re: Detecting broker version programmatically

2016-02-04 Thread Manikumar Reddy
Currently it is available through JMX Mbean. It is not available on wire
protocol/requests.

Pending JIRAs related to this:
https://issues.apache.org/jira/browse/KAFKA-2061

On Fri, Feb 5, 2016 at 4:31 AM,  wrote:

> Is there a way to detect the broker version (even at a high level 0.8 vs
> 0.9) using the kafka-clients Java library?
>
> --
> Best regards,
> Marko
> www.kafkatool.com
>
>


Re: Detecting broker version programmatically

2016-02-04 Thread Manikumar Reddy
@James
   It is broker-id  for Kafka server and client-id for java
producer/consumer apps

@Dana
   Yes, we can infer using custom logic.


Re: Producer code to a partition

2016-02-03 Thread Manikumar Reddy
In kafka, each record can have a key.  This key is used to distribute
records to partitions.
All non-keyed records will be distributed in round-robin fashion.
All keyed records will be distributed based on the hash of the key / or can
write a custom partitioner.
or we can specify partition number for each message using "ProducerRecord"
constructor.

https://kafka.apache.org/documentation.html#theproducer

On Thu, Feb 4, 2016 at 11:53 AM, Joe San <codeintheo...@gmail.com> wrote:

> What is the partition key? Why do I need to specify the partition key and a
> partition number?
>
> On Thu, Feb 4, 2016 at 7:17 AM, Manikumar Reddy <manikumar.re...@gmail.com
> >
> wrote:
>
> > Hi,
> >
> >  You can use ProducerRecord(java.lang.String topic, java.lang.Integer
> > partition, K key, V value) constructor
> >   to pass partition number.
> >
> >
> >
> >
> https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
> >
> > Kumar
> >
> > On Thu, Feb 4, 2016 at 11:41 AM, Joe San <codeintheo...@gmail.com>
> wrote:
> >
> > > Kafka users,
> > >
> > > The code below is something that I have to write to a Topic!
> > >
> > > def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
> > >   Future {
> > > logger.info(s"Persisting ${tsDataPoints.length} data-points in
> > > Kafka topic ${producerConfig.topic}")
> > > val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
> > > val jsonMessage = Json.toJson(dataPoints).toString()
> > > val recordMetaDataF = producer.send(
> > >   new ProducerRecord[String, String](producerConfig.topic,
> > jsonMessage)
> > > )
> > > // if we don't make it to Kafka within 3 seconds, we timeout
> > > val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
> > > logger.info(
> > >   s"persisted ${tsDataPoints.length} data-points to kafka topic:
> " +
> > > s"${recordMetaData.topic()} partition:
> > > ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
> > > )
> > > ()
> > >   }
> > > }
> > >
> > >
> > > How could I make this code write to a specific partition? currently my
> > > topic does not have partitions, so by default this code write to
> > partition
> > > 0 of the topic!
> > >
> > > I'm using Kafka 0.9.0.0! Any suggestions?
> > >
> > > Regards,
> > > Joe
> > >
> >
>


Re: Producer code to a partition

2016-02-03 Thread Manikumar Reddy
Hi,

 You can use ProducerRecord(java.lang.String topic, java.lang.Integer
partition, K key, V value) constructor
  to pass partition number.


https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html

Kumar

On Thu, Feb 4, 2016 at 11:41 AM, Joe San  wrote:

> Kafka users,
>
> The code below is something that I have to write to a Topic!
>
> def publishMessage(tsDataPoints: Seq[DataPoint]): Future[Unit] = {
>   Future {
> logger.info(s"Persisting ${tsDataPoints.length} data-points in
> Kafka topic ${producerConfig.topic}")
> val dataPoints = DataPoints("kafkaProducer", tsDataPoints)
> val jsonMessage = Json.toJson(dataPoints).toString()
> val recordMetaDataF = producer.send(
>   new ProducerRecord[String, String](producerConfig.topic, jsonMessage)
> )
> // if we don't make it to Kafka within 3 seconds, we timeout
> val recordMetaData = recordMetaDataF.get(3, TimeUnit.SECONDS)
> logger.info(
>   s"persisted ${tsDataPoints.length} data-points to kafka topic:  " +
> s"${recordMetaData.topic()} partition:
> ${recordMetaData.partition()} offset: ${recordMetaData.offset()}"
> )
> ()
>   }
> }
>
>
> How could I make this code write to a specific partition? currently my
> topic does not have partitions, so by default this code write to partition
> 0 of the topic!
>
> I'm using Kafka 0.9.0.0! Any suggestions?
>
> Regards,
> Joe
>


Re: [VOTE] 0.8.2.2 Candidate 1

2015-09-09 Thread Manikumar Reddy
+1 (non-binding). verified the artifacts, quick start.

On Wed, Sep 9, 2015 at 2:41 AM, Ashish  wrote:

> +1 (non-binding)
>
> Ran the build, works fine. All test cases passed
>
> On Thu, Sep 3, 2015 at 9:22 AM, Jun Rao  wrote:
> > This is the first candidate for release of Apache Kafka 0.8.2.2. This
> only
> > fixes two critical issues (KAFKA-2189 and KAFKA-2308) related to snappy
> in
> > 0.8.2.1.
> >
> > Release Notes for the 0.8.2.2 release
> >
> https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Tuesday, Sep 8, 7pm PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > and sha2 (SHA256) checksum.
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/
> >
> > * Maven artifacts to be voted upon prior to release:
> > https://repository.apache.org/content/groups/staging/
> >
> > * scala-doc
> > https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/scaladoc/
> >
> > * java-doc
> > https://people.apache.org/~junrao/kafka-0.8.2.2-candidate1/javadoc/
> >
> > * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.2 tag
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=d01226cfdcb3d9daad8465234750fa515a1e7e4a
> >
> > /***
> >
> > Thanks,
> >
> > Jun
>
>
>
> --
> thanks
> ashish
>
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>


Re: Query - Compression

2015-08-24 Thread Manikumar Reddy
Hi,
   If you are using producer's inbuilt compression (by
setting compression.type property),
then the consumer will auto decompress the data for you.


Kumar


On Mon, Aug 24, 2015 at 12:19 PM, ram kumar ramkumarro...@gmail.com wrote:

 Hi,

 If i compress the data in producer as snappy,
 while consuming should i  decompress it ?

 Thankz



Re: spark broadcast variable of Kafka producer throws ConcurrentModificationException

2015-08-18 Thread Manikumar Reddy
Hi,
  looks like the exception is occurring at kryo serialization. make sure
you are not concurrently modifying java.util.Vector data  structure.

kumar

On Wed, Aug 19, 2015 at 3:32 AM, Shenghua(Daniel) Wan wansheng...@gmail.com
 wrote:

 Hi,
 Did anyone see java.util.ConcurrentModificationException when using
 broadcast variables?
 I encountered this exception when wrapping a Kafka producer like this in
 the spark streaming driver.

 Here is what I did.
 KafkaProducerString, String producer = new KafkaProducerString,
 String(properties);
 final BroadcastKafkaDataProducer bCastProducer
 = streamingContext.sparkContext().broadcast(producer);

 Then within an closure called by a foreachRDD, I was trying to get the
 wrapped producer, i.e.
  KafkaProducerString, String p = bCastProducer.value();

 after rebuilding and rerunning, I got the stack trace like this

 Exception in thread main com.esotericsoftware.kryo.KryoException:
 java.util.ConcurrentModificationException
 Serialization trace:
 classes (sun.misc.Launcher$AppClassLoader)
 classloader (java.security.ProtectionDomain)
 context (java.security.AccessControlContext)
 acc (org.apache.spark.util.MutableURLClassLoader)
 contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
 ioThread (org.apache.kafka.clients.producer.KafkaProducer)
 producer (my driver)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at

 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
 at

 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at

 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
 at

 org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
 at

 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
 at

 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85)
 at

 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
 at

 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
 at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
 at

 org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
 at my driver
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.util.ConcurrentModificationException
 at 

Re: [DISCUSSION] Kafka 0.8.2.2 release?

2015-08-14 Thread Manikumar Reddy
+1  for 0.8.2.2 release

On Fri, Aug 14, 2015 at 5:49 PM, Ismael Juma ism...@juma.me.uk wrote:

 I think this is a good idea as the change is minimal on our side and it has
 been tested in production for some time by the reporter.

 Best,
 Ismael

 On Fri, Aug 14, 2015 at 1:15 PM, Jun Rao j...@confluent.io wrote:

  Hi, Everyone,
 
  Since the release of Kafka 0.8.2.1, a number of people have reported an
  issue with snappy compression (
  https://issues.apache.org/jira/browse/KAFKA-2189). Basically, if they
 use
  snappy in 0.8.2.1, they will experience a 2-3X space increase. The issue
  has since been fixed in trunk (just a snappy jar upgrade). Since 0.8.3 is
  still a few months away, it may make sense to do an 0.8.2.2 release just
 to
  fix this issue. Any objections?
 
  Thanks,
 
  Jun
 



Re: logging for Kafka new producer

2015-08-11 Thread Manikumar Reddy
New producer uses SLF4J logging. We can configure any logging framework
like log4j, java.util.logging and logback etc.

On Tue, Aug 11, 2015 at 11:38 AM, Tao Feng fengta...@gmail.com wrote:

 Hi,

 I am wondering what Kafka new producer uses for logging. Is it  log4j?

 Thanks,
 -Tao



Re: Partition and consumer configuration

2015-08-10 Thread Manikumar Reddy
Hi,



 1. Will Kafka distribute the 100 serialized files randomly  say 20 files go
 to   Partition 1, 25 to Partition 2 etc or do I have an option to configure
 how many files go to which partition .


   Assuming you are using new producer,

   All keyed messages will be distributed based on the hash of the key (or)
   All non-keyed message will be distributed in round-robin fashion.
   (or) You can specify partition number for each message using
ProducerRecord object.


 2.  How to configure each consumer to read from a particular partition
 only. Say consumer 1 to read from partition 4 only likewise.


  You can use SimpleConsumer  to consumer from a particular partition.
   https://kafka.apache.org/documentation.html#simpleconsumerapi


 2. If I have not set the max size what will happen to that 1MB file, I read
 in the documentation that default size is 10KB.


Default  max.request.size on new producer  and message.max.bytes on
broker is 1MB.
Anything above these limits, you will get exception.


 Kumar


Re: kafka benchmark tests

2015-07-14 Thread Manikumar Reddy
Yes, A list of  Kafka Server host/port pairs to use for establishing the
initial connection to the Kafka cluster

https://kafka.apache.org/documentation.html#newproducerconfigs

On Tue, Jul 14, 2015 at 7:29 PM, Yuheng Du yuheng.du.h...@gmail.com wrote:

 Does anyone know what is bootstrap.servers=
 esv4-hcl198.grid.linkedin.com:9092 means in the following test command:

 bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
 test7 5000 100 -1 acks=1 bootstrap.servers=
 esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196?

 what is bootstrap.servers? Is it the kafka server that I am running a test
 at?

 Thanks.

 Yuheng




 On Tue, Jul 14, 2015 at 12:18 AM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  I implemented (nearly) the same basic set of tests in the system test
  framework we started at Confluent and that is going to move into Kafka --
  see the wip patch for KIP-25 here:
 https://github.com/apache/kafka/pull/70
  In particular, that test is implemented in benchmark_test.py:
 
 
 https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8
 
  Hopefully once that's merged people can reuse that benchmark (and add to
  it!) so they can easily run the same benchmarks across different
 hardware.
  Here are some results from an older version of that test on m3.2xlarge
  instances on EC2 using local ephemeral storage (I think... it's been
 awhile
  since I ran these numbers and I didn't document methodology that
  carefully):
 
  INFO:_.KafkaBenchmark:=
  INFO:_.KafkaBenchmark:BENCHMARK RESULTS
  INFO:_.KafkaBenchmark:=
  INFO:_.KafkaBenchmark:Single producer, no replication: 684097.470208
  rec/sec (65.24 MB/s)
  INFO:_.KafkaBenchmark:Single producer, async 3x replication:
  667494.359673 rec/sec (63.66 MB/s)
  INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
  116485.764275 rec/sec (11.11 MB/s)
  INFO:_.KafkaBenchmark:Three producers, async 3x replication:
  1696519.022182 rec/sec (161.79 MB/s)
  INFO:_.KafkaBenchmark:Message size:
  INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
  INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
  INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
  INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
  INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
  INFO:_.KafkaBenchmark:Throughput over long run, data  memory:
  INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec (65.30
 MB/s)
  INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec (56.830500
  MB/s)
  INFO:_.KafkaBenchmark:Three consumers: 3304011.014900 rec/sec (267.830800
  MB/s)
  INFO:_.KafkaBenchmark:Producer + consumer:
  INFO:_.KafkaBenchmark: Producer: 624984.375391 rec/sec (59.60 MB/s)
  INFO:_.KafkaBenchmark: Consumer: 624984.375391 rec/sec (59.60 MB/s)
  INFO:_.KafkaBenchmark:End-to-end latency: median 2.00 ms, 99%
  4.00 ms, 99.9% 19.00 ms
 
  Don't trust these numbers for anything, the were a quick one-off test.
 I'm
  just pasting the output so you get some idea of what the results might
 look
  like. Once we merge the KIP-25 patch, Confluent will be running the tests
  regularly and results will be available publicly so we'll be able to keep
  better tabs on performance, albeit for only a specific class of hardware.
 
  For the batch.size question -- I'm not sure the results in the blog post
  actually have different settings, it could be accidental divergence
 between
  the script and the blog post. The post specifically notes that tuning the
  batch size in the synchronous case might help, but that he didn't do
 that.
  If you're trying to benchmark the *optimal* throughput, tuning the batch
  size would make sense. Since synchronous replication will have higher
  latency and there's a limit to how many requests can be in flight at
 once,
  you'll want a larger batch size to compensate for the additional latency.
  However, in practice the increase you see may be negligible. Somebody who
  has spent more time fiddling with tweaking producer performance may have
  more insight.
 
  -Ewen
 
  On Mon, Jul 13, 2015 at 10:08 AM, JIEFU GONG jg...@berkeley.edu wrote:
 
   Hi all,
  
   I was wondering if any of you guys have done benchmarks on Kafka
   performance before, and if they or their details (# nodes in cluster, #
   records / size(s) of messages, etc.) could be shared.
  
   For comparison purposes, I am trying to benchmark Kafka against some
   similar services such as Kinesis or Scribe. Additionally, I was
 wondering
   if anyone could shed some insight on Jay Kreps' benchmarks that he has
   openly published here:
  
  
 
 https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
  
   Specifically, I am unsure of why between his tests of 3x synchronous
   replication and 3x async 

Re: How to run Kafka in background

2015-06-24 Thread Manikumar Reddy
You can pass -daemon config property to kafka startup script.
./kafka-server-start.sh -daemon ../config/server.1.properties

On Wed, Jun 24, 2015 at 4:14 PM, bit1...@163.com bit1...@163.com wrote:

 Hi,

 I am using kafak 0.8.2.1 , and when I startup Kafka with the script:
 ./kafka-server-start.sh ../config/server.1.properties 

 I think it will run as background process, but when i close the terminal,
 the server is shutdown, which looks that it doesn't run in background

 Then how can I run it in background? Thanks!






 bit1...@163.com



Re: Issue with log4j Kafka Appender.

2015-06-18 Thread Manikumar Reddy
You can enable producer  debug log and verify. In 0.8.2.0, you can set
 compressionType
, requiredNumAcks,  syncSend producer config properties to log4j.xml. Trunk
build can take additional retries property .


Manikumar

On Thu, Jun 18, 2015 at 1:14 AM, Madhavi Sreerangam 
madhavi.sreeran...@gmail.com wrote:

 I have configured my log4j with Kafka Appender.(Kafka version 0.8.2.0)
 Following are entries from my log4j file

 log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
 log4j.appender.KAFKA.BrokerList=iakafka301p.dev.ch3.s.com:9092,
 iakafka302p.dev.ch3.s.com:9092,iakafka303p.dev.ch3.s.com:9092
 log4j.appender.KAFKA.Topic=dev-1.0_audit
 log4j.appender.KAFKA.Serializer=kafka.test.AppenderStringSerializer
 log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
 log4j.appender.KAFKA.layout.ConversionPattern=%m-%d

 Kafka is configured with 3 servers, 3 partitions and 3 replicas.
 I have created a test method to publish the messages to kafka topic as
 follows

 private void testKAFKAlog(int noOfMessages){
 for(int i=0; i  noOfMessages; i++){
 KAFKA_LOG.info(Test Message:  + i);
 }
 }
 I could not see any messages published into the topic. Then I have modified
 the test method to introduce some wait between the requests as follows

 private void testKAFKAlog(int noOfMessages){
 for(int i=0; i  noOfMessages; i++){
 try {
 Thread.sleep(10);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 KAFKA_LOG.info(Test Message:  + i);
 }
 }

 Then all the messages started publishing. I did this exercise couple of
 times with and without sleep between the requests. Messages got published
 only when there is sleep in between the requests.
 Does any one help me here, what is wrong with the configurations I am
 using. (I can't afford 10ms wait for each message, as my application logs
 few Million messages for each run).
 Is there any way that I can override the default ProducerConfig for log4j
 kafka appender.



Re: How to specify kafka bootstrap jvm options?

2015-06-17 Thread Manikumar Reddy
Most of the tuning options are available in kafka-run-class.sh. You can
override required props (KAFKA_HEAP_OPTS , KAFKA_JVM_PERFORMANCE_OPTS) to
kafka-server-start.sh script.

On Wed, Jun 17, 2015 at 2:11 PM, luo.fucong bayinam...@gmail.com wrote:

 I want to tune the kafka jvm options, but nowhere can I pass the options
 to the kafka startup script(bin/kafka-server-start.sh).
 How to pass in the jvm options?


Re: Log compaction not working as expected

2015-06-16 Thread Manikumar Reddy
Hi,

  Your observation is correct.  we never compact the active segment.
  Some improvements are proposed here,
  https://issues.apache.org/jira/browse/KAFKA-1981


Manikumar

On Tue, Jun 16, 2015 at 5:35 PM, Shayne S shaynest...@gmail.com wrote:

 Some further information, and is this a bug?  I'm using 0.8.2.1.

 Log compaction will only occur on the non active segments.  Intentional or
 not, it seems that the last segment is always the active segment.  In other
 words, an expired segment will not be cleaned until a new segment has been
 created.

 As a result, a log won't be compacted until new data comes in (per
 partition). Does this mean I need to send the equivalent of a pig (
 https://en.wikipedia.org/wiki/Pigging) through each partition in order to
 force compaction?  Or can I force the cleaning somehow?

 Here are the steps to recreate:

 1. Create a new topic with a 5 minute segment.ms:

 kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
 --replication-factor 1 --partitions 1 --config cleanup.policy=compact
 --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

 2. Repeatedly add messages with identical keys (3x):

 echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list
 localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
 key.separator=, --new-producer

 3. Wait 5+ minutes and confirm no log compaction.
 4. Once satisfied, send a new message:

 echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list
 localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
 key.separator=, --new-producer

 5. Log compaction will occur quickly soon after.

 Is my use case of infrequent logs not supported? Is this intentional
 behavior? It's unnecessarily challenging to target each partition with a
 dummy message to trigger compaction.

 Also, I believe there is another issue with logs originally configured
 without a segment timeout that lead to my original issue.  I still cannot
 get those logs to compact.

 Thanks!
 Shayne



Re: Log compaction not working as expected

2015-06-16 Thread Manikumar Reddy
Ok..I got your point. Currently we check the log segment constraints
(segment.bytes, segment.ms)
only before appending new messages. So we will not create a new log segment
until new data comes.

In your case, your approach(sending periodic dummy/ping message) should be
fine.



On Tue, Jun 16, 2015 at 7:19 PM, Shayne S shaynest...@gmail.com wrote:

 Thank you for the response!

 Unfortunately, those improvements would not help.  It is the lack of
 activity resulting in a new segment that prevents compaction.

 I was confused by what qualifies as the active segment. The active segment
 is the last segment as opposed to the segment that would be written to if
 something were received right now.

 On Tue, Jun 16, 2015 at 8:38 AM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:

  Hi,
 
Your observation is correct.  we never compact the active segment.
Some improvements are proposed here,
https://issues.apache.org/jira/browse/KAFKA-1981
 
 
  Manikumar
 
  On Tue, Jun 16, 2015 at 5:35 PM, Shayne S shaynest...@gmail.com wrote:
 
   Some further information, and is this a bug?  I'm using 0.8.2.1.
  
   Log compaction will only occur on the non active segments.  Intentional
  or
   not, it seems that the last segment is always the active segment.  In
  other
   words, an expired segment will not be cleaned until a new segment has
  been
   created.
  
   As a result, a log won't be compacted until new data comes in (per
   partition). Does this mean I need to send the equivalent of a pig (
   https://en.wikipedia.org/wiki/Pigging) through each partition in order
  to
   force compaction?  Or can I force the cleaning somehow?
  
   Here are the steps to recreate:
  
   1. Create a new topic with a 5 minute segment.ms:
  
   kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
   --replication-factor 1 --partitions 1 --config cleanup.policy=compact
   --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30
  
   2. Repeatedly add messages with identical keys (3x):
  
   echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list
   localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
   key.separator=, --new-producer
  
   3. Wait 5+ minutes and confirm no log compaction.
   4. Once satisfied, send a new message:
  
   echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list
   localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
   key.separator=, --new-producer
  
   5. Log compaction will occur quickly soon after.
  
   Is my use case of infrequent logs not supported? Is this intentional
   behavior? It's unnecessarily challenging to target each partition with
 a
   dummy message to trigger compaction.
  
   Also, I believe there is another issue with logs originally configured
   without a segment timeout that lead to my original issue.  I still
 cannot
   get those logs to compact.
  
   Thanks!
   Shayne
  
 



Re: cannot make another partition reassignment due to the previous partition reassignment failure

2015-06-15 Thread Manikumar Reddy
Hi,
  Jut delete the /admin/reassign_partitions zk node for zookeeper  and
try again.

#sh zookeeper-shell.sh localhost:2181
  delete /admin/reassign_partitions


Manikumar


On Tue, Jun 16, 2015 at 8:15 AM, Yu Yang yuyan...@gmail.com wrote:

 HI,

 We have a kafka 0.8.1.1 cluster. Recently I did a partition assignment for
 some topic partitions in the cluster. Due to broker failure, the partition
 reassignment failed. I cannot do another partition assignment now, and
 always get errors as follows. How can we work around this? I have tried
 google for answers, but did not succeed.

 Partitions reassignment failed due to Partition reassignment currently in
 progress for Map(). Aborting operation
 kafka.common.AdminCommandFailedException: Partition reassignment currently
 in progress for Map(). Aborting operation
 at

 kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:204)
 at

 kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:124)
 at

 kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:49)
 at
 kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)


 Thanks!

 Regards,
 Yu



Re: Producer RecordMetaData with Offset -1

2015-06-12 Thread Manikumar Reddy
Hi,

What is the value set for acks config property?
If acks=0 then the producer will not wait for any  acknowledgment from the
server and
offset given back for each record will always be set to -1.

Manikumar

On Fri, Jun 12, 2015 at 7:17 PM, Gokulakannan M (Engineering - Data
Platform) gokulakanna...@flipkart.com wrote:

 Hi,

 I am running a standalone kafka cluster and writing some sample messages
 using the java client API KafkaProducer. The RecordMetaData I am receiving
 from server(KafkaProducer send method return value) contains offset -1. Is
 offset information available only at consumer end and not in producer side
 yet? Or am I missing something?

 I am using kafka version 0.8.2.0


 --



 --

 This email and any files transmitted with it are confidential and intended
 solely for the use of the individual or entity to whom they are addressed.
 If you have received this email in error please notify the system manager.
 This message contains confidential information and is intended only for the
 individual named. If you are not the named addressee you should not
 disseminate, distribute or copy this e-mail. Please notify the sender
 immediately by e-mail if you have received this e-mail by mistake and
 delete this e-mail from your system. If you are not the intended recipient
 you are notified that disclosing, copying, distributing or taking any
 action in reliance on the contents of this information is strictly
 prohibited. Although Flipkart has taken reasonable precautions to ensure no
 viruses are present in this email, the company cannot accept responsibility
 for any loss or damage arising from the use of this email or attachments



Re: Kafka Rebalance on Watcher event Question

2015-05-11 Thread Manikumar Reddy
All the consumers in the same consumer group will share the load across
given topic/partitions.
So for any consumer failure, there will be a re-balance to assign the
failed topic/partitions to live consumers.

pl check consumer documentation here
https://kafka.apache.org/documentation.html#introduction

On Mon, May 11, 2015 at 11:17 AM, dinesh kumar dinesh...@gmail.com wrote:

 But why? What is reason for triggering a rebalance if none of the topics of
 a consumers are affected? Is there some reason  for triggering a rebalance
 irrespective of the consumers topics getting affected ?

 On 11 May 2015 at 11:06, Manikumar Reddy ku...@nmsworks.co.in wrote:

  If both C1,C2 belongs to same consumer group, then the re-balance will be
  triggered.
  A consumer subscribes to event changes of the consumer id registry within
  its group.
 
  On Mon, May 11, 2015 at 10:55 AM, dinesh kumar dinesh...@gmail.com
  wrote:
 
   Hi,
   I am looking at the code of
  kafka.consumer.ZookeeperConsumerConnector.scala
   (link here
   
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
   )
   and I see that all ids registered to a particular group ids are
  registered
   to  the path /consumers/[group_id]/ids in zookeeper. the ids contain
 the
   consumer_id - topics mapping.
  
   A watcher is registered in zookeeper that is triggered when there is a
   change to /consumers/[group_id]/ids. This watcher event is handled by
 the
   class ZKRebalancerListener. This class calls a synced rebalance
 whenever
  a
   watcher event is received.
  
   So here is my question.
   1. Lets consider a scenario where there a two topics T1 and T2 and two
   consumers C1 and C2. C1 consumes only from T1 and C2 only from T2. Say
 if
   C2 dies for some reason as explained before, C1 will get a watcher
 event
   from zookeeper and a synced rebalance will be triggered. Why does C2
  dying
   which has absolutely nothing with C1 (there is no intersection of
 topics
   between C1 and C2) should trigger a rebalance event in C1. Is there
 some
   condition where this is necessary that I am missing?
  
   Thanks,
   Dinesh
  
 



Re: Kafka Rebalance on Watcher event Question

2015-05-10 Thread Manikumar Reddy
If both C1,C2 belongs to same consumer group, then the re-balance will be
triggered.
A consumer subscribes to event changes of the consumer id registry within
its group.

On Mon, May 11, 2015 at 10:55 AM, dinesh kumar dinesh...@gmail.com wrote:

 Hi,
 I am looking at the code of kafka.consumer.ZookeeperConsumerConnector.scala
 (link here
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
 )
 and I see that all ids registered to a particular group ids are registered
 to  the path /consumers/[group_id]/ids in zookeeper. the ids contain the
 consumer_id - topics mapping.

 A watcher is registered in zookeeper that is triggered when there is a
 change to /consumers/[group_id]/ids. This watcher event is handled by the
 class ZKRebalancerListener. This class calls a synced rebalance whenever a
 watcher event is received.

 So here is my question.
 1. Lets consider a scenario where there a two topics T1 and T2 and two
 consumers C1 and C2. C1 consumes only from T1 and C2 only from T2. Say if
 C2 dies for some reason as explained before, C1 will get a watcher event
 from zookeeper and a synced rebalance will be triggered. Why does C2 dying
 which has absolutely nothing with C1 (there is no intersection of topics
 between C1 and C2) should trigger a rebalance event in C1. Is there some
 condition where this is necessary that I am missing?

 Thanks,
 Dinesh



Re: New producer: metadata update problem on 2 Node cluster.

2015-04-28 Thread Manikumar Reddy
Hi Ewen,

 Thanks for the response.  I agree with you, In some case we should use
bootstrap servers.



 If you have logs at debug level, are you seeing this message in between the
 connection attempts:

 Give up sending metadata request since no node is available


 Yes, this log came for couple of times.



 Also, if you let it continue running, does it recover after the
 metadata.max.age.ms timeout?


 It does not reconnect.  It is continuously trying to connect with dead
node.


-Manikumar


Re: New producer: metadata update problem on 2 Node cluster.

2015-04-27 Thread Manikumar Reddy
Any comments on this issue?
On Apr 24, 2015 8:05 PM, Manikumar Reddy ku...@nmsworks.co.in wrote:

 We are testing new producer on a 2 node cluster.
 Under some node failure scenarios, producer is not able
 to update metadata.

 Steps to reproduce
 1. form a 2 node cluster (K1, K2)
 2. create a topic with single partition, replication factor = 2
 3. start producing data (producer metadata : K1,K2)
 2. Kill leader node (say K1)
 3. K2 becomes the leader (producer metadata : K2)
 4. Bring back K1 and Kill K2 before metadata.max.age.ms
 5. K1 becomes the Leader (producer metadata still contains : K2)

 After this point, producer is not able to update the metadata.
 producer continuously trying to connect with dead node (K2).

 This looks like a bug to me. Am I missing anything?



New Java Producer: Single Producer vs multiple Producers

2015-04-24 Thread Manikumar Reddy
We have a 2 node cluster with 100 topics.
should we use a single producer for all topics or  create multiple
producers?
What is the best choice w.r.t network load/failures, node failures,
latency, locks?

Regards,
Manikumar


New producer: metadata update problem on 2 Node cluster.

2015-04-24 Thread Manikumar Reddy
We are testing new producer on a 2 node cluster.
Under some node failure scenarios, producer is not able
to update metadata.

Steps to reproduce
1. form a 2 node cluster (K1, K2)
2. create a topic with single partition, replication factor = 2
3. start producing data (producer metadata : K1,K2)
2. Kill leader node (say K1)
3. K2 becomes the leader (producer metadata : K2)
4. Bring back K1 and Kill K2 before metadata.max.age.ms
5. K1 becomes the Leader (producer metadata still contains : K2)

After this point, producer is not able to update the metadata.
producer continuously trying to connect with dead node (K2).

This looks like a bug to me. Am I missing anything?


Re: New Java Producer: Single Producer vs multiple Producers

2015-04-24 Thread Manikumar Reddy
Hi Jay,

Yes, we are producing from single process/jvm.

From docs The producer will attempt to batch records together into fewer
requests whenever multiple records are being sent to the same partition.

If I understand correctly, batching happens at topic/partition level, not
at Node level. right?

If yes, then  both (single producer for all topics , separate producer for
each topic) approaches
may give similar performance.

On Fri, Apr 24, 2015 at 9:29 PM, Jay Kreps jay.kr...@gmail.com wrote:

 If you are talking about within a single process, having one producer is
 generally the fastest because batching dramatically reduces the number of
 requests (esp using the new java producer).
 -Jay

 On Fri, Apr 24, 2015 at 4:54 AM, Manikumar Reddy 
 manikumar.re...@gmail.com
 wrote:

  We have a 2 node cluster with 100 topics.
  should we use a single producer for all topics or  create multiple
  producers?
  What is the best choice w.r.t network load/failures, node failures,
  latency, locks?
 
  Regards,
  Manikumar
 



Re: Broker shuts down due to unrecoverable I/O error

2015-03-03 Thread Manikumar Reddy
Hi,
 We are running on RedHat Linux with SAN storage.  This happened only once.

Thanks,
Manikumar.

On Tue, Mar 3, 2015 at 10:02 PM, Jun Rao j...@confluent.io wrote:

 Which OS is this on? Is this easily reproducible?

 Thanks,

 Jun

 On Sun, Mar 1, 2015 at 8:24 PM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:

  Kafka 0.8.2 server got stopped after getting below I/O exception.
  Any thoughts on below exception? Can it be file system related?
 
  [2015-03-01 14:36:27,627] FATAL [KafkaApi-0] Halting due to unrecoverable
  I/O error while handling produce request:  (kafka.serv
  er.KafkaApis)
  kafka.common.KafkaStorageException: I/O exception in append to log
  'NOTIFICATION_CATEGORY'
  at kafka.log.Log.append(Log.scala:266)
  at
 
 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
  at
 
 
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
  at kafka.utils.Utils$.inLock(Utils.scala:535)
  at kafka.utils.Utils$.inReadLock(Utils.scala:541)
  at
  kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
  at
 
 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
  at
 
 
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
  at
 
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
  at
 
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
  at
 
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
  at
 scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
  at
  scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
  at
 
 
 kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
  at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
  at
  kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
  at java.lang.Thread.run(Thread.java:745)
  Caused by: java.io.FileNotFoundException:
  /KAFKA/logs_kafka/NOTIFICATION_CATEGORY-0/03746532.index (No
  such file or directory)
  at java.io.RandomAccessFile.open(Native Method)
  at java.io.RandomAccessFile.init(RandomAccessFile.java:241)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:277)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
  at kafka.utils.Utils$.inLock(Utils.scala:535)
  at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
  at
 
 
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265)
  at
 
 
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
  at
 
 
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
  at kafka.utils.Utils$.inLock(Utils.scala:535)
  at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
  at kafka.log.Log.roll(Log.scala:563)
  at kafka.log.Log.maybeRoll(Log.scala:539)
  at kafka.log.Log.append(Log.scala:306)
 
 
  There are other exceptions also:
 
  [2015-03-01 13:59:18,616] ERROR Uncaught exception in scheduled task
  'kafka-log-retention' (kafka.utils.KafkaScheduler)
  java.io.FileNotFoundException:
 
 
 /cygnet/KAFKA/logs_kafka/NOTIF_CH_PROBMGMT_RESOURCE-0/00458729.index
  (No such file or
   directory)
  at java.io.RandomAccessFile.open(Native Method)
  at java.io.RandomAccessFile.init(RandomAccessFile.java:241)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:277)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
  at kafka.utils.Utils$.inLock(Utils.scala:535)
  at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
  at
 
 
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265)
  at
 
 
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
  at
 
 
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
  at kafka.utils.Utils$.inLock(Utils.scala:535)
  at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
  at kafka.log.Log.roll(Log.scala:563)
  at kafka.log.Log.deleteOldSegments(Log.scala:486)
  at
 
 
 kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala

Broker shuts down due to unrecoverable I/O error

2015-03-01 Thread Manikumar Reddy
Kafka 0.8.2 server got stopped after getting below I/O exception.
Any thoughts on below exception? Can it be file system related?

[2015-03-01 14:36:27,627] FATAL [KafkaApi-0] Halting due to unrecoverable
I/O error while handling produce request:  (kafka.serv
er.KafkaApis)
kafka.common.KafkaStorageException: I/O exception in append to log
'NOTIFICATION_CATEGORY'
at kafka.log.Log.append(Log.scala:266)
at
kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
at
kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.utils.Utils$.inReadLock(Utils.scala:541)
at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
at
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
at
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
at
kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException:
/KAFKA/logs_kafka/NOTIFICATION_CATEGORY-0/03746532.index (No
such file or directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.init(RandomAccessFile.java:241)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:277)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.Log.roll(Log.scala:563)
at kafka.log.Log.maybeRoll(Log.scala:539)
at kafka.log.Log.append(Log.scala:306)


There are other exceptions also:

[2015-03-01 13:59:18,616] ERROR Uncaught exception in scheduled task
'kafka-log-retention' (kafka.utils.KafkaScheduler)
java.io.FileNotFoundException:
/cygnet/KAFKA/logs_kafka/NOTIF_CH_PROBMGMT_RESOURCE-0/00458729.index
(No such file or
 directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.init(RandomAccessFile.java:241)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:277)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
at kafka.log.Log.roll(Log.scala:563)
at kafka.log.Log.deleteOldSegments(Log.scala:486)
at
kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:411)
at
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:442)
at
kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:440)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at 

Re: How to measure performance metrics

2015-02-24 Thread Manikumar Reddy
Hi,

There are bunch of metrics available for performance monitoring.  These
metrics are can be monitored
by  JMX monitoring tool (Jconsole).

https://kafka.apache.org/documentation.html#monitoring.

Some of the  available metrics reporters are:

https://cwiki.apache.org/confluence/display/KAFKA/JMX+Reporters

On Wed, Feb 25, 2015 at 11:10 AM, Bhuvana Baskar bhuvana.baska...@gmail.com
 wrote:

 Hi,

 Please let me know how to measure the performance metrics while
 pushing/consuming the message to/from the topic.

 Thanks.



Re: Custom partitioner in kafka-0.8.2.0

2015-02-19 Thread Manikumar Reddy
Hi,

In new producer, we can specify the partition number as part of
ProducerRecord.

From javadocs :
*If a valid partition number is specified that partition will be used when
sending the record. If no partition is specified but a key is present a
partition will be chosen using a hash of the key. If neither key nor
partition is present a partition will be assigned in a round-robin fashion.
*

http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html


ManiKumar

On Thu, Feb 19, 2015 at 6:05 PM, sunil kalva kalva.ka...@gmail.com wrote:

 Hi
 I could not find a way to customize Partitioner class in new KafaProducer
 class, is it intentional ?

 tx
 SunilKalva



Re: regarding custom msg

2015-02-09 Thread Manikumar Reddy
Can you post the exception stack-trace?

On Mon, Feb 9, 2015 at 2:58 PM, Gaurav Agarwal gaurav130...@gmail.com
wrote:

 hello
 We are sending custom message across producer and consumer. But
 getting class cast exception . This is working fine with String
 message and string encoder.
 But this did not work with custom message , i got class cast
 exception. I have a message with couple of String attributes



Re: Not found NewShinyProducer sync performance metrics

2015-02-08 Thread Manikumar Reddy
You can configure the jmx ports by using below system properties.

 -Dcom.sun.management.jmxremote.port=
 -Dcom.sun.management.jmxremote.rmi.port=8889

On Fri, Feb 6, 2015 at 9:19 AM, Xinyi Su xiny...@gmail.com wrote:

 Hi,

 I try to use  Jconsole to connect  remote Kafka broker which is running
 behind a firewall. But it is blocked by the firewall.

 I can specify JMX registry port by set JMX_PORT= which is allowed by
 firewall, but I cannot specify the ephemeral port which is always chosen
 randomly at startup. This ephemeral port is which JMX RMI server listens on
 and through which actual data exchange takes place. It is randomly assigned
 and I have no way to specify it as some port which firewall does not block.

 How to solve this issue since I cannot access Jconsole because of firewall?

 Thanks.
 Xinyi

 On 6 February 2015 at 07:24, Otis Gospodnetic otis.gospodne...@gmail.com
 wrote:

  Not announced yet, but http://sematext.com/spm should be showing you all
  the new shiny Kafka (new producer) metrics out of the box.  If you don't
  see them, please shout (I know we have a bit more tweaking to do in the
  coming day-two-three).
 
  If you want to just dump MBeans from JMX manually and eyeball the output,
  you could use something like https://github.com/sematext/jmxc to dump
 the
  whole JMX content of your Java Consumer, Producer, or Broker.
 
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log Management
  Solr  Elasticsearch Support * http://sematext.com/
 
 
  On Thu, Feb 5, 2015 at 5:58 AM, Manikumar Reddy ku...@nmsworks.co.in
  wrote:
 
   New Producer uses Kafka's own metrics api. Currently metrics are
 reported
   using jmx. Any jmx monitoring tool (jconsole) can be used for
 monitoring.
   On Feb 5, 2015 3:56 PM, Xinyi Su xiny...@gmail.com wrote:
  
Hi,
I am using kafka-producer-perf-test.sh to study NewShinyProducer
 *sync*
performance.
   
I have not found any CSV output or metrics collector for
  NewShinyProducer
sync performance.
   
Would you like to share with me about how to collect NewShinyProducer
metrics?
   
Thanks.
   
Best regards.
Xinyi
   
  
 



Re: one message consumed by both consumers in the same group?

2015-02-08 Thread Manikumar Reddy
Hi,

bin/kafka-console-consumer.sh --.

 all the parameters are the same


 You need to set same group.id to create a consumer group. By default
console consumer creates a random group.id.
 You can set group.id by using  --consumer.config /tmp/comsumer.props
flag.

  $$echo group.id=1  /tmp/consumer.props


Re: Not found NewShinyProducer sync performance metrics

2015-02-05 Thread Manikumar Reddy
New Producer uses Kafka's own metrics api. Currently metrics are reported
using jmx. Any jmx monitoring tool (jconsole) can be used for monitoring.
On Feb 5, 2015 3:56 PM, Xinyi Su xiny...@gmail.com wrote:

 Hi,
 I am using kafka-producer-perf-test.sh to study NewShinyProducer *sync*
 performance.

 I have not found any CSV output or metrics collector for NewShinyProducer
 sync performance.

 Would you like to share with me about how to collect NewShinyProducer
 metrics?

 Thanks.

 Best regards.
 Xinyi



Re: Potential socket leak in kafka sync producer

2015-01-29 Thread Manikumar Reddy
Hope you are closing the producers. can you share the attachment through
gist/patebin

On Fri, Jan 30, 2015 at 11:11 AM, ankit tyagi ankittyagi.mn...@gmail.com
wrote:

 Hi Jaikiran,

 I am using ubuntu and was able to reproduce on redhat too. Please find the
 more information below.


 *DISTRIB_ID=Ubuntu*
 *DISTRIB_RELEASE=12.04*
 *DISTRIB_CODENAME=precise*
 *DISTRIB_DESCRIPTION=Ubuntu 12.04.5 LTS*

 *java version 1.7.0_72*

 This is happening on client side. Output of lsof was showing that maximum
 fd were FIFO and anon. But after GC FD count was reduced significantly.

 Below is my Client Code which i am using for publishing message.


 * private ProducerKafkaPartitionKey, KafkaEventWrapper myProducer;*

 * myProducer =new Producer(new
 ProducerConfig(myProducerProperties));*

 *   public void send(*
 *ListKeyedMessageKafkaPartitionKey, KafkaEventWrapper msgs)*
 *{*
 *myProducer.send(msgs);*
 *}*


 we are using sync producer. I am attaching object histo before GC(histo_1)
 and after GC(histo_2) in my application.

 On Fri, Jan 30, 2015 at 9:34 AM, Jaikiran Pai jai.forums2...@gmail.com
 wrote:

 
  Which operating system are you on and what Java version? Depending on the
  OS, you could get tools (like lsof) to show which file descriptors are
  being held on to. Is it the client JVM which ends up with these leaks?
 
  Also, would it be possible to post a snippet of your application code
  which shows how you are using the Kafka APIs?
 
  -Jaikiran
  On Thursday 29 January 2015 04:36 PM, ankit tyagi wrote:
 
  Hi,
 
  Currently we are using sync producer client of 0.8.1 version in our
  production box . we are getting the following exception while publishing
  kafka message
 
  *[2015-01-29
  13:21:45.505][ThreadPoolTaskExecutor-603][WARN][ClientUtils$:89]
 Fetching
  topic metadata with correlation id 10808 for topics [Set(*
  *kafka_topic_coms_FD_test1)] from broker [id:0,host:localhost,port:9092]
  failed*
  *java.net.ConnectException: Connection refused*
  *at sun.nio.ch.Net.connect0(Native Method)*
  *at sun.nio.ch.Net.connect(Net.java:465)*
  *at sun.nio.ch.Net.connect(Net.java:457)*
  *at
  sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)*
   at kafka.network.BlockingChannel.connect(BlockingChannel.scala:
  57)
   at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
   at
  kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
   at
  kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
  doSend(SyncProducer.scala:68)
   at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
   at
  kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
   at
  kafka.producer.BrokerPartitionInfo.updateInfo(
  BrokerPartitionInfo.scala:82)
 
 
  we are using dynamic thread pool to publish message to kafka. My
  observation is when after keep alive time when threads in my executor
 gets
  destroyed, somehow file descriptor is not getting cleared but when i did
  explicitly ran the full gc, fd count got reduced by a signification
 amout.
 
 
 



Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-27 Thread Manikumar Reddy
I have enabled yammer's ConsoleReporter and I am getting all the metrics
(including per-topic metrics).

Yammer's MetricName object implements equals/hashcode methods using
mBeanName . We are constructing a unique mBeanName for each metric, So we
are not missing/overwriting any metrics.

Current confusion is due to  MetricName.name(). This will be same
(BytesInPerSec) for both broker level and topic level metrics. We need to
use MetricName.getMBeanName() to differentiate between broker level and
topic level metrics.

0.8.1  MBeanName:
kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=MYTOPIC-BytesInPerSec

0.8.2  MBeanName:
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=MYTOPIC


ConsoleReporter's O/P:

  BytesInPerSec:  - This is broker level
 count = 1521
 mean rate = 3.63 bytes/s
 1-minute rate = 0.35 bytes/s
 5-minute rate = 2.07 bytes/s
15-minute rate = 1.25 bytes/s

  BytesInPerSec:  - This is for topic1
 count = 626
 mean rate = 1.89 bytes/s
 1-minute rate = 0.42 bytes/s
 5-minute rate = 31.53 bytes/s
15-minute rate = 64.66 bytes/s

  BytesInPerSec:  - This is for topic2
 count = 895
 mean rate = 3.62 bytes/s
 1-minute rate = 1.39 bytes/s
 5-minute rate = 30.08 bytes/s
15-minute rate = 50.27 bytes/s

Manikumar

On Tue, Jan 27, 2015 at 1:59 PM, Jason Rosenberg j...@squareup.com wrote:

 Ok,

 It looks like the yammer MetricName is not being created correctly for the
 sub metrics that include a topic. E.g. a metric with an mbeanName like:

 kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec,topic=mytopic

 appears to be malformed. A yammer MetricName has 4 fields that are used in
 creating a graphite metric, that are included in the constructor:
 group, type, name, scope.

 In this case, the metric with the above mbeanName has these fields set in
 the MetricName:

 group: kafka.server
 type: BrokerTopicMetrics
 name: BytesInPerSec
 scope: null

 Thus, the topic metrics all look the same, and get lumped into the
 top-level BrokerTopicMetrics (and thus that will now be double counted). It
 looks like the fix for kafka-1481 was where things got broken. It seems to
 have introduced ‘tags’ in the building of metric names, and then those tags
 only get applied to the mbeanName, but get excluded from the metric name:

 https://github.com/apache/kafka/commit/457744a820d806e546edebbd8ffd33f6772e519f

 This is a pretty severe issue, since the yammer metrics for these stats
 will be double counted in aggregate, and the per-topic stats will be
 removed.

 I should note too, in my previous email, I thought that only the per-topic
 BrokerTopicMetrics were missing, but also several other per-topic metrics
 are missing too, e.g. under kafka.log, etc.

 Jason
 ​

 On Tue, Jan 27, 2015 at 2:20 AM, Jason Rosenberg j...@squareup.com wrote:

  I can confirm that the per topic metrics are not coming through to the
  yammer metrics registry.  I do see them in jmx (via jconsole), but the
  MetricsRegistry does not have them.
  All the other metrics are coming through that appear in jmx.
 
  This is with single node instance running locally.
 
  Jason
 
 
 
  On Mon, Jan 26, 2015 at 8:30 PM, Manikumar Reddy ku...@nmsworks.co.in
  wrote:
 
  If you are using multi-node cluster, then metrics may be reported from
  other servers.
  pl check all the servers in the cluster.
 
  On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker kyleban...@gmail.com
  wrote:
 
   I've been using a custom KafkaMetricsReporter to report Kafka broker
   metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and
  messages in
   and out for all topics together and for each individual topic.
  
   After upgrading to v0.8.2.0, these metrics are no longer being
 reported.
  
   I'm only seeing the following:
   BrokerTopicMetrics
   - BytesInPerSec
   - BytesOutPerSec
   - BytesRejectedPerSec
   - MessagesInPerSec
  
   What's more, despite lots of successful writes to the cluster, the
  values
   for these remaining metrics are all zero.
  
   I saw that there was some refactoring of metric naming code. Was the
   behavior supposed to have changed?
  
   Many thanks in advance.
  
 
 
 



Re: Missing Per-Topic BrokerTopicMetrics in v0.8.2.0

2015-01-26 Thread Manikumar Reddy
If you are using multi-node cluster, then metrics may be reported from
other servers.
pl check all the servers in the cluster.

On Tue, Jan 27, 2015 at 4:12 AM, Kyle Banker kyleban...@gmail.com wrote:

 I've been using a custom KafkaMetricsReporter to report Kafka broker
 metrics to Graphite. In v0.8.1.1, Kafka was reporting bytes and messages in
 and out for all topics together and for each individual topic.

 After upgrading to v0.8.2.0, these metrics are no longer being reported.

 I'm only seeing the following:
 BrokerTopicMetrics
 - BytesInPerSec
 - BytesOutPerSec
 - BytesRejectedPerSec
 - MessagesInPerSec

 What's more, despite lots of successful writes to the cluster, the values
 for these remaining metrics are all zero.

 I saw that there was some refactoring of metric naming code. Was the
 behavior supposed to have changed?

 Many thanks in advance.



Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 2 (with the correct links)

2015-01-26 Thread Manikumar Reddy
+1 (Non-binding)
Verified source package, unit tests, release build, topic deletion,
compaction and random testing

On Mon, Jan 26, 2015 at 6:14 AM, Neha Narkhede n...@confluent.io wrote:

 +1 (binding)
 Verified keys, quick start, unit tests.

 On Sat, Jan 24, 2015 at 4:26 PM, Joe Stein joe.st...@stealth.ly wrote:

  That makes sense, thanks!
 
  On Sat, Jan 24, 2015 at 7:00 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   But I think the flaw in trying to guess what kind of serializer they
 will
   use is when we get it wrong. Basically let's say we guess String. Say
  30%
   of the time we will be right and we will save the two configuration
  lines.
   70% of the time we will be wrong and the user gets a super cryptic
   ClassCastException: xyz cannot be cast to [B (because [B is how java
   chooses to display the byte array class just to up the pain), then they
   figure out how to subscribe to our mailing list and email us the
 cryptic
   exception, then we explain about how we helpfully set these properties
  for
   them to save them time. :-)
  
  
 https://www.google.com/?gws_rd=ssl#q=kafka+classcastexception+%22%5BB%22
  
   I think basically we did this experiment with the old clients and the
   conclusion is that serialization is something you basically have to
 think
   about to use Kafka and trying to guess just makes things worse.
  
   -Jay
  
   On Sat, Jan 24, 2015 at 2:51 PM, Joe Stein joe.st...@stealth.ly
 wrote:
  
   Maybe. I think the StringSerialzer could look more like a typical type
  of
   message.  Instead of encoding being a property it would be more
  typically
   just written in the bytes.
  
   On Sat, Jan 24, 2015 at 12:12 AM, Jay Kreps jay.kr...@gmail.com
  wrote:
  
I don't think so--see if you buy my explanation. We previously
  defaulted
to the byte array serializer and it was a source of unending
  frustration
and confusion. Since it wasn't a required config people just went
  along
plugging in whatever objects they had, and thinking that changing
 the
parametric types would somehow help. Then they would get a class
 case
exception and assume our stuff was somehow busted, not realizing we
  had
helpfully configured a type different from what they were passing in
   under
the covers. So I think it is actually good for people to think: how
  am I
serializing my data, and getting that exception will make them ask
  that
question right?
   
-Jay
   
On Fri, Jan 23, 2015 at 9:06 PM, Joe Stein joe.st...@stealth.ly
   wrote:
   
Should value.serializer in the new java producer be defaulted to
Array[Byte] ?
   
I was working on testing some upgrade paths and got this
   
! return exception in callback when buffer cannot accept
 message
   
  ConfigException: Missing required configuration
   value.serializer
which has no default value. (ConfigDef.java:124)
   
   
  org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)
   
   
   
   
  
 
 org.apache.kafka.common.config.AbstractConfig.init(AbstractConfig.java:48)
   
   
   
   
  
 
 org.apache.kafka.clients.producer.ProducerConfig.init(ProducerConfig.java:235)
   
   
   
   
  
 
 org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:129)
   
   
   
   
  
 
 ly.stealth.testing.BaseSpec$class.createNewKafkaProducer(BaseSpec.scala:42)
   
   
  
 ly.stealth.testing.KafkaSpec.createNewKafkaProducer(KafkaSpec.scala:36)
   
   
   
   
  
 
 ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:175)
   
   
   
   
  
 
 ly.stealth.testing.KafkaSpec$$anonfun$3$$anonfun$apply$37.apply(KafkaSpec.scala:170)
   
   
   
On Fri, Jan 23, 2015 at 5:55 PM, Jun Rao j...@confluent.io wrote:
   
 This is a reminder that the deadline for the vote is this Monday,
  Jan
26,
 7pm PT.

 Thanks,

 Jun

 On Wed, Jan 21, 2015 at 8:28 AM, Jun Rao j...@confluent.io
 wrote:

 This is the second candidate for release of Apache Kafka
 0.8.2.0.
   There
 has been some changes since the 0.8.2 beta release, especially
 in
   the
new
 java producer api and jmx mbean names. It would be great if
 people
   can
test
 this out thoroughly.

 Release Notes for the 0.8.2.0 release


   
  
 
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html

 *** Please download, test and vote by Monday, Jan 26h, 7pm PT

 Kafka's KEYS file containing PGP keys we use to sign the
 release:
 http://kafka.apache.org/KEYS in addition to the md5, sha1 and
  sha2
 (SHA256) checksum.

 * Release artifacts to be voted upon (source and binary):
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/

 * Maven artifacts to be voted upon prior to release:
 https://repository.apache.org/content/groups/staging/

 * scala-doc

   

Re: [kafka-clients] [VOTE] 0.8.2.0 Candidate 2

2015-01-21 Thread Manikumar Reddy
Also Maven artifacts link is not correct

On Wed, Jan 21, 2015 at 9:50 PM, Jun Rao j...@confluent.io wrote:

 Yes, will send out a new email with the correct links.

 Thanks,

 Jun

 On Wed, Jan 21, 2015 at 3:12 AM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:

 All links are pointing to
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/.
 They should be
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/ right?


 On Tue, Jan 20, 2015 at 8:32 AM, Jun Rao j...@confluent.io wrote:

 This is the second candidate for release of Apache Kafka 0.8.2.0. There
 has been some changes since the 0.8.2 beta release, especially in the
 new java producer api and jmx mbean names. It would be great if people can
 test this out thoroughly.

 Release Notes for the 0.8.2.0 release
 *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/RELEASE_NOTES.html*

 *** Please download, test and vote by Monday, Jan 26h, 7pm PT

 Kafka's KEYS file containing PGP keys we use to sign the release:
 *http://kafka.apache.org/KEYS http://kafka.apache.org/KEYS* in
 addition to the md5, sha1 and sha2 (SHA256) checksum.

 * Release artifacts to be voted upon (source and binary):
 *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/*

 * Maven artifacts to be voted upon prior to release:
 *https://repository.apache.org/content/groups/staging/
 https://repository.apache.org/content/groups/staging/*

 * scala-doc
 *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/scaladoc/#package*

 * java-doc
 *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/*

 * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
 *https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c*
  (commit 0b312a6b9f0833d38eec434bfff4c647c1814564)

 /***

 Thanks,

 Jun

 --
 You received this message because you are subscribed to the Google
 Groups kafka-clients group.
 To unsubscribe from this group and stop receiving emails from it, send
 an email to kafka-clients+unsubscr...@googlegroups.com.
 To post to this group, send email to kafka-clie...@googlegroups.com.
 Visit this group at http://groups.google.com/group/kafka-clients.
 To view this discussion on the web visit
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G-UiYefj%3Dt3C1x85m7q1xjDifTnLSnkujMpP40GHLNwag%40mail.gmail.com
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G-UiYefj%3Dt3C1x85m7q1xjDifTnLSnkujMpP40GHLNwag%40mail.gmail.com?utm_medium=emailutm_source=footer
 .
 For more options, visit https://groups.google.com/d/optout.



  --
 You received this message because you are subscribed to the Google Groups
 kafka-clients group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to kafka-clients+unsubscr...@googlegroups.com.
 To post to this group, send email to kafka-clie...@googlegroups.com.
 Visit this group at http://groups.google.com/group/kafka-clients.
 To view this discussion on the web visit
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G_U2S4SbbfFPZ913Pr6ThwDBepj9BKCk%3DL6uGVRuzgP2g%40mail.gmail.com
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G_U2S4SbbfFPZ913Pr6ThwDBepj9BKCk%3DL6uGVRuzgP2g%40mail.gmail.com?utm_medium=emailutm_source=footer
 .

 For more options, visit https://groups.google.com/d/optout.



Re: [kafka-clients] [VOTE] 0.8.2.0 Candidate 2

2015-01-21 Thread Manikumar Reddy
All links are pointing to
https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/.
They should be https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/
right?


On Tue, Jan 20, 2015 at 8:32 AM, Jun Rao j...@confluent.io wrote:

 This is the second candidate for release of Apache Kafka 0.8.2.0. There
 has been some changes since the 0.8.2 beta release, especially in the new
 java producer api and jmx mbean names. It would be great if people can test
 this out thoroughly.

 Release Notes for the 0.8.2.0 release
 *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/RELEASE_NOTES.html
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/RELEASE_NOTES.html*

 *** Please download, test and vote by Monday, Jan 26h, 7pm PT

 Kafka's KEYS file containing PGP keys we use to sign the release:
 *http://kafka.apache.org/KEYS http://kafka.apache.org/KEYS* in addition
 to the md5, sha1 and sha2 (SHA256) checksum.

 * Release artifacts to be voted upon (source and binary):
 *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/*

 * Maven artifacts to be voted upon prior to release:
 *https://repository.apache.org/content/groups/staging/
 https://repository.apache.org/content/groups/staging/*

 * scala-doc
 *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/scaladoc/
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/scaladoc/#package*

 * java-doc
 *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate2/javadoc/
 https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/*

 * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
 *https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c
 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=058d58adef2ab2787e49d8efeefd61bb3d32f99c*
  (commit 0b312a6b9f0833d38eec434bfff4c647c1814564)

 /***

 Thanks,

 Jun

 --
 You received this message because you are subscribed to the Google Groups
 kafka-clients group.
 To unsubscribe from this group and stop receiving emails from it, send an
 email to kafka-clients+unsubscr...@googlegroups.com.
 To post to this group, send email to kafka-clie...@googlegroups.com.
 Visit this group at http://groups.google.com/group/kafka-clients.
 To view this discussion on the web visit
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G-UiYefj%3Dt3C1x85m7q1xjDifTnLSnkujMpP40GHLNwag%40mail.gmail.com
 https://groups.google.com/d/msgid/kafka-clients/CAFc58G-UiYefj%3Dt3C1x85m7q1xjDifTnLSnkujMpP40GHLNwag%40mail.gmail.com?utm_medium=emailutm_source=footer
 .
 For more options, visit https://groups.google.com/d/optout.



Re: Consumer questions

2015-01-17 Thread Manikumar Reddy
AFAIK, we can not replay the messages with high level consumer. We need to
use simple consumer.

On Sun, Jan 18, 2015 at 12:15 AM, Christopher Piggott cpigg...@gmail.com
wrote:

 Thanks.  That helped clear a lot up in my mind.

 I'm trying to high-level consumer now.  Occasionally I need to do a replay
 of the stream.  The example is:

KafkaStream.iterator();

 which starts at wherever zookeeper recorded as where you left off.

 With the high level interface, can you request an iterator that starts at
 the very beginning?



 On Fri, Jan 16, 2015 at 8:55 PM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:

  Hi,
 
  1. In SimpleConsumer, you must keep track of the offsets in your
  application.
 In the example code,  readOffset  variable  can be saved in
  redis/zookeeper.
 You should plugin this logic in your code. High Level Consumer stores
  the last
 read offset information in ZooKeeper.
 
  2. You will get OffsetOutOfRange for any invalid offset.
 On error, you can decide what to do. i.e read from the latest ,
 earliest
  or some other offset.
 
  3. https://issues.apache.org/jira/browse/KAFKA-1779
 
  4. Yes
 
 
  Manikumar
 
  On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott cpigg...@gmail.com
 
  wrote:
 
   Hi,
  
   I am following this link:
  
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
  
   for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can
 find
  in
   maven central).  I have a couple of questions about the consumer.  I
   checked the archives and didn't see these exact questions asked
 already,
   but I may have missed them -- I apologize if that is the case.
  
  
   When I create a consumer I give it a consumer ID.  I assumed that it
  would
   store my consumer's name as well as the last readOffset in zookeeper,
 but
   looking in zookeeper that doesn't seem to be the case.  So it seems to
 me
   that when my consumers come up they need to either get the entire
 history
   from the start of time (which could take a long time, as I have 14 day
   durability); or else they need to somehow keep track of the read offset
   themselves.
  
   I have redis in my system already, so I have the choice of keeping
 track
  of
   this in either redis or zookeeper.  It seems like zookeeper would be a
   better idea.  Am I right, though, that the SimpleConsumer and the
  example I
   linked above don't keep track of this, so if I want to do that I would
  have
   to do it myself?
  
   Second question: in the example consumer, there is an error handler
 that
   checks if you received an OffsetOutOfRange response from kafka.  If so,
  it
   gets a new read offset .LatestTime().  My interpretation of this is
 that
   you have asked it for an offset which doesn't make sense, so it just
  scans
   you to the end of the stream.  That's a guaranteed data loss.  A simple
   alternative would be to take the beginning of the stream, which if you
  have
   idempotent processing would be fine - it would be a replay - but it
 could
   take a long time.
  
   I don't know for sure what would cause you to get an OffsetOutOfRange -
  the
   only thing I can really think of is that someone has changed the
  underlying
   stream on you (like they deleted and recreated it and didn't tell all
 the
   consumers).  I guess it's possible that if I have a 1 day stream
  durability
   and I stop my consumer for 3 days that it could ask for a readOffset
 that
   no longer exists; it's not clear to me whether or not that would result
  in
   an OffsetOutOfRange error or not.
  
   Does that all make sense?
  
   Third question: I set a .maxWait(1000) interpreting that to mean that
  when
   I make my fetch request the consumer will time out if there are no new
   messages in 1 second.  It doesn't seem tow ork - my call to
   consumer.fetch() seems to return immediately.  Is that expected?
  
   Final question: just to confirm:
  
   new FetchRequestBuilder().addFetch( topic, shardNum, readOffset,
   FETCH_SIZE )
  
   FETCH_SIZE is in bytes, not number of messages, so presumably it
 fetches
  as
   many messages as will fit into that many byte buffer?  Is that right?
  
   Thanks.
  
  
   Christopher Piggott
   Sr. Staff Engineer
   Golisano Institute for Sustainability
   Rochester Institute of Technology
  
 



Re: dumping JMX data

2015-01-17 Thread Manikumar Reddy
JIRAs related to the issue are

https://issues.apache.org/jira/browse/KAFKA-1680
https://issues.apache.org/jira/browse/KAFKA-1679

On Sun, Jan 18, 2015 at 3:12 AM, Scott Chapman sc...@woofplanet.com wrote:

 While I appreciate all the suggestions on other JMX related tools, my
 question is really about the JMXTool included in and documented in Kafka
 and how to use it to dump all the JMX data. I can get it to dump some
 mbeans, so i know my config is working. But what I can't seem to do (which
 is described in the documentation) is to dump all attributes of all
 objects.

 Please, anyone using it have any experience it that might be able to help
 me?

 Thanks in advance!

 On Sat Jan 17 2015 at 12:39:56 PM Albert Strasheim full...@gmail.com
 wrote:

  On Fri, Jan 16, 2015 at 5:52 PM, Joe Stein joe.st...@stealth.ly wrote:
   Here are some more tools for that
   https://cwiki.apache.org/confluence/display/KAFKA/JMX+Reporters
  depending
   on what you have in place and what you are trying todo different
 options
   exist.
  
   A lot of folks like JMX Trans.
 
  We tried JMX Trans for a while, but didn't like it very much.
 
  Jolokia looks promising. Trying that now.
 
  http://www.jolokia.org/
 



Re: Consumer questions

2015-01-16 Thread Manikumar Reddy
Hi,

1. In SimpleConsumer, you must keep track of the offsets in your
application.
   In the example code,  readOffset  variable  can be saved in
redis/zookeeper.
   You should plugin this logic in your code. High Level Consumer stores
the last
   read offset information in ZooKeeper.

2. You will get OffsetOutOfRange for any invalid offset.
   On error, you can decide what to do. i.e read from the latest , earliest
or some other offset.

3. https://issues.apache.org/jira/browse/KAFKA-1779

4. Yes


Manikumar

On Sat, Jan 17, 2015 at 2:25 AM, Christopher Piggott cpigg...@gmail.com
wrote:

 Hi,

 I am following this link:


 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

 for a consumer design using kafka_2.9.2 version 0.8.1.1 (what I can find in
 maven central).  I have a couple of questions about the consumer.  I
 checked the archives and didn't see these exact questions asked already,
 but I may have missed them -- I apologize if that is the case.


 When I create a consumer I give it a consumer ID.  I assumed that it would
 store my consumer's name as well as the last readOffset in zookeeper, but
 looking in zookeeper that doesn't seem to be the case.  So it seems to me
 that when my consumers come up they need to either get the entire history
 from the start of time (which could take a long time, as I have 14 day
 durability); or else they need to somehow keep track of the read offset
 themselves.

 I have redis in my system already, so I have the choice of keeping track of
 this in either redis or zookeeper.  It seems like zookeeper would be a
 better idea.  Am I right, though, that the SimpleConsumer and the example I
 linked above don't keep track of this, so if I want to do that I would have
 to do it myself?

 Second question: in the example consumer, there is an error handler that
 checks if you received an OffsetOutOfRange response from kafka.  If so, it
 gets a new read offset .LatestTime().  My interpretation of this is that
 you have asked it for an offset which doesn't make sense, so it just scans
 you to the end of the stream.  That's a guaranteed data loss.  A simple
 alternative would be to take the beginning of the stream, which if you have
 idempotent processing would be fine - it would be a replay - but it could
 take a long time.

 I don't know for sure what would cause you to get an OffsetOutOfRange - the
 only thing I can really think of is that someone has changed the underlying
 stream on you (like they deleted and recreated it and didn't tell all the
 consumers).  I guess it's possible that if I have a 1 day stream durability
 and I stop my consumer for 3 days that it could ask for a readOffset that
 no longer exists; it's not clear to me whether or not that would result in
 an OffsetOutOfRange error or not.

 Does that all make sense?

 Third question: I set a .maxWait(1000) interpreting that to mean that when
 I make my fetch request the consumer will time out if there are no new
 messages in 1 second.  It doesn't seem tow ork - my call to
 consumer.fetch() seems to return immediately.  Is that expected?

 Final question: just to confirm:

 new FetchRequestBuilder().addFetch( topic, shardNum, readOffset,
 FETCH_SIZE )

 FETCH_SIZE is in bytes, not number of messages, so presumably it fetches as
 many messages as will fit into that many byte buffer?  Is that right?

 Thanks.


 Christopher Piggott
 Sr. Staff Engineer
 Golisano Institute for Sustainability
 Rochester Institute of Technology



Re: Question on running Kafka Producer in Java environment

2015-01-16 Thread Manikumar Reddy
Pl check your classpath. Some jars might be missing.

On Sat, Jan 17, 2015 at 7:41 AM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 Thank you for the time and help. I had the Kafka Producer running, but am
 having some trouble now.

 1) Using Maven, I wrote a Kafka Producer similar to the one found here:

 https://github.com/pppsunil/HelloKafka/blob/master/src/main/java/com/spnotes/kafka/HelloKafkaProducer.java

 2) I am using these jar files: kafka_2.10-0.8.2-beta.jar,
 kafka-clients-0.8.2-beta.jar, metrics-core-2.2.0.jar,
 scala-library-2.10.4.jar, slf4j-api-1.5.6.jar, slf4j-log4j12-1.5.6.jar

 3) I used to submit it via spark-submit script on my ec2-instance, but I
 realized it was not good practice and want to submit it via the java
 script.

 4) I did javac -cp jar files HKP.java and got a .class file

 5) I did jar cmf manifest.mf (just the class and version) test.jar
 HPK.class and got the jar file test.jar

 6) I try to run it doing java -cp jar files -jar test.jar and I am
 getting some NoClassDefFoundErrors first I had to download the slf4j-api
 jar and now I am getting NoClassDefFoundError:
 kafka/producer/ProducerConfig even though I have the Kafka jar files needed
 and these 4 jar files worked when doing it through the spark-submit script.

 Any suggestions are much appreciated!

 Thanks!



Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-15 Thread Manikumar Reddy
Also can we remove delete.topic.enable config property and enable topic
deletion by default?
On Jan 15, 2015 10:07 PM, Jun Rao j...@confluent.io wrote:

 Thanks for reporting this. I will remove that option in RC2.

 Jun

 On Thu, Jan 15, 2015 at 5:21 AM, Jaikiran Pai jai.forums2...@gmail.com
 wrote:

  I just downloaded the Kafka binary and am trying this on my 32 bit JVM
  (Java 7)? Trying to start Zookeeper or Kafka server keeps failing with
  Unrecognized VM option 'UseCompressedOops':
 
  ./zookeeper-server-start.sh ../config/zookeeper.properties
  Unrecognized VM option 'UseCompressedOops'
  Error: Could not create the Java Virtual Machine.
  Error: A fatal exception has occurred. Program will exit.
 
  Same with the Kafka server startup scripts. My Java version is:
 
  java version 1.7.0_71
  Java(TM) SE Runtime Environment (build 1.7.0_71-b14)
  Java HotSpot(TM) Server VM (build 24.71-b01, mixed mode)
 
  Should there be a check in the script, before adding this option?
 
  -Jaikiran
 
  On Wednesday 14 January 2015 10:08 PM, Jun Rao wrote:
 
  + users mailing list. It would be great if people can test this out and
  report any blocker issues.
 
  Thanks,
 
  Jun
 
  On Tue, Jan 13, 2015 at 7:16 PM, Jun Rao j...@confluent.io wrote:
 
   This is the first candidate for release of Apache Kafka 0.8.2.0. There
  has been some changes since the 0.8.2 beta release, especially in the
 new
  java producer api and jmx mbean names. It would be great if people can
  test
  this out thoroughly. We are giving people 10 days for testing and
 voting.
 
  Release Notes for the 0.8.2.0 release
  *https://people.apache.org/~junrao/kafka-0.8.2.0-
  candidate1/RELEASE_NOTES.html
  https://people.apache.org/~junrao/kafka-0.8.2.0-
  candidate1/RELEASE_NOTES.html*
 
  *** Please download, test and vote by Friday, Jan 23h, 7pm PT
 
  Kafka's KEYS file containing PGP keys we use to sign the release:
  *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/KEYS
  https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/KEYS* in
  addition to the md5, sha1
  and sha2 (SHA256) checksum.
 
  * Release artifacts to be voted upon (source and binary):
  *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/
  https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/*
 
  * Maven artifacts to be voted upon prior to release:
  *https://people.apache.org/~junrao/kafka-0.8.2.0-
  candidate1/maven_staging/
  https://people.apache.org/~junrao/kafka-0.8.2.0-
  candidate1/maven_staging/*
 
  * scala-doc
  *https://people.apache.org/~junrao/kafka-0.8.2.0-
  candidate1/scaladoc/#package
  https://people.apache.org/~junrao/kafka-0.8.2.0-
  candidate1/scaladoc/#package*
 
  * java-doc
  *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/
  https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/*
 
  * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
  *https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
  b0c7d579f8aeb5750573008040a42b7377a651d5
  https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
  b0c7d579f8aeb5750573008040a42b7377a651d5*
 
  /***
 
  Thanks,
 
  Jun
 
 
 



Re: Delete topic

2015-01-14 Thread Manikumar Reddy
I think now we should delete this config property and allow topic deletion
in 0.8.2
 Yep, you need to set delete.topic.enable=true.

Forgot that step :)

2015-01-14 10:16 GMT-08:00 Jayesh Thakrar j_thak...@yahoo.com.invalid:
 Does one also need to set the config parameter delete.topic.enable to
true ?I am using 8.2 beta and I had to set it to true to enable topic
deletion.
   From: Armando Martinez Briones arma...@tralix.com
  To: users@kafka.apache.org
  Sent: Wednesday, January 14, 2015 11:33 AM
  Subject: Re: Delete topic

 thanks Gwen Shapira ;)

 El 14 de enero de 2015, 11:31, Gwen Shapira gshap...@cloudera.com
 escribió:

 At the moment, the best way would be:

 * Wait about two weeks
 * Upgrade to 0.8.2
 * Use kafka-topic.sh --delete

 :)

 2015-01-14 9:26 GMT-08:00 Armando Martinez Briones arma...@tralix.com:
  Hi.
 
  What is the best way to delete a topic into production environment?
 
  --
  [image: Tralix][image: 1]José Armando Martínez Briones
  *Arquitecto de software*
  Tralix México
  Tel: +52 442 161 1002 ext. 2920
  www.tralix.com.mx
  https://twitter.com/#!/tralix
  http://www.facebook.com/pages/Tralix/47723192646*TRALIX MÉXICO S. DE
 R.L.
  DE C.V* como responsable del tratamiento de sus datos personales, hace
de
  su conocimiento que la información obtenida por este medio es tratada
de
  forma estrictamente confidencial por lo que recabaremos y trataremos de
 sus
  datos según los lineamientos de nuestro Aviso de Privacidad
  http://tralix.com.mx/politicasdeprivacidad/avisodeprivacidad.html.
 Para
  conocer la versión completa podrá hacerlo a través de la página Aviso
de
  privacidad. Al leer el presente Aviso de Privacidad y no manifestar su
  oposición al tratamiento de sus Datos Personales, se entiende que usted
  conoció la versión completa y acepta los términos del mismo, siempre de
  conformidad a la Ley Federal de Protección de Datos Personales en
 Posesión
  de los Particulares.






 --
 [image: Tralix][image: 1]José Armando Martínez Briones
 *Arquitecto de software*
 Tralix México
 Tel: +52 442 161 1002  ext. 2920
 www.tralix.com.mx
 https://twitter.com/#!/tralix
 http://www.facebook.com/pages/Tralix/47723192646*TRALIX MÉXICO S. DE
R.L.
 DE C.V* como responsable del tratamiento de sus datos personales, hace de
 su conocimiento que la información obtenida por este medio es tratada de
 forma estrictamente confidencial por lo que recabaremos y trataremos de
sus
 datos según los lineamientos de nuestro Aviso de Privacidad
 http://tralix.com.mx/politicasdeprivacidad/avisodeprivacidad.html. Para
 conocer la versión completa podrá hacerlo a través de la página Aviso de
 privacidad. Al leer el presente Aviso de Privacidad y no manifestar su
 oposición al tratamiento de sus Datos Personales, se entiende que usted
 conoció la versión completa y acepta los términos del mismo, siempre de
 conformidad a la Ley Federal de Protección de Datos Personales en Posesión
 de los Particulares.




Re: Configuring location for server (log4j) logs

2015-01-14 Thread Manikumar Reddy
you just need to set LOG_DIR property . All logs will be redirected to
LOG_DIR directory.

On Thu, Jan 15, 2015 at 11:49 AM, Shannon Lloyd shanl...@gmail.com wrote:

 By default Kafka writes its server logs into a logs directory underneath
 the installation root. I'm trying to override this to get it to write logs
 to an external location so that I can separate all the read/write logs/data
 from the read-only binaries. But I can't get it to work correctly.

 I've set the following in my startup script to try to get it to log to
 /data/kafka/log:

 export
 KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/kafka/log4j.properties
 -Dkafka.logs.dir=/data/kafka/log

 In /etc/kafka/log4j.properties I've also set kafka.logs.dir to the same
 location as specified above. If I only specify it in the properties file
 and not via KAFKA_LOG4J_OPTS, it doesn't work (all of the logs keep going
 to the default location).

 And this works... sort of. It sends all logs where I want *except* for
 kafkaServer-gc.log and kafkaServer.out, both of which still go to logs
 under the installation root.

 Am I missing something? Or can I simply not redirect these two files so
 that ALL my logs go somewhere outside the installation location?

 Thanks.



Re: Javadoc errors in MetricName when building with Java 8

2015-01-14 Thread Manikumar Reddy
Thanks for reporting this issue. We should be able to build on java 8. Will
correct the javadocs.

On Wed, Jan 14, 2015 at 9:26 AM, Shannon Lloyd shanl...@gmail.com wrote:

 Is Java 8 supported for building Kafka? Or do you only support Java 7? I
 just noticed that the latest code on the 0.8.2 branch fails on the javadoc
 in the new MetricName class due to Java 8's javadoc tool being much
 stricter when checking comments (especially around things like HTML tags).

 For example:

 kafka/clients/src/main/java/org/apache/kafka/common/MetricName.java:22:
 error: self-closing element not allowed
  * p/
^
 kafka/clients/src/main/java/org/apache/kafka/common/MetricName.java:39:
 error: malformed HTML
  * MapString, String metricTags = new LinkedHashMapString, String();
   ^
 kafka/clients/src/main/java/org/apache/kafka/common/MetricName.java:39:
 error: bad use of ''
  * MapString, String metricTags = new LinkedHashMapString, String();

 I realise that you can avoid these errors via -Xdoclint, but it seems like
 it would be preferable to fix up the javadoc instead.

 Cheers,
 Shannon



Re: Get replication and partition count of a topic

2015-01-12 Thread Manikumar Reddy
Hi,

kafka-topics.sh script can be used to retrieve topic information.

Ex: sh kafka-topics.sh --zookeeper localhost:2181 --describe --topic TOPIC1

You can look into TopicCommand.scala code
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;hb=HEAD

On Mon, Jan 12, 2015 at 2:01 PM, Ankit Jain ankitm.j...@impetus.co.in
wrote:

 Hi All,


 I want to get the replication and partition count of a topic. I tried the
 following piece of code:


 java.util.SetString topics = new HashSetString();
 topics.add(topicName);
 SetTopicMetadata topicMetadatas =
 AdminUtils.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(topics),
 zkClient);
 IteratorTopicMetadata topicMetadataIterator =
 topicMetadatas.iterator();

 while (topicMetadataIterator.hasNext()) {
 topicMetadataIterator.next();
 topicMetadataIterator.next().
 IteratorPartitionMetadata partitionMetadataIterator =
 topicMetadataIterator.next().partitionsMetadata().iterator();

 }


 But, the above code returning me the metadata of each partition and also
 replica details of each partition.


 Is there any simple API available in kafka to get the partition and
 replica count for a topic.


 Thanks,

 Ankit


 






 NOTE: This message may contain information that is confidential,
 proprietary, privileged or otherwise protected by law. The message is
 intended solely for the named addressee. If received in error, please
 destroy and notify the sender. Any use of this email is prohibited when
 received in error. Impetus does not represent, warrant and/or guarantee,
 that the integrity of this communication has been maintained nor that the
 communication is free of errors, virus, interception or interference.



Re: Kafka broker shutting down after running fine for 1-2 hours

2015-01-10 Thread Manikumar Reddy
Are you running kafka as a non-daemon process?
If yes, there is a chance process getting killed, if we close terminal.

On Sat, Jan 10, 2015 at 9:31 PM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 Are you seeing any errors/exceptions? Can you paste Kafka log output?

 On Sat, Jan 10, 2015 at 2:42 PM, Kartik Singh kartiksi...@giveter.com
 wrote:

 Hello,
 We have just started using kafka. Our test setup consists of a single
 partition. We have integrated kafka to our system successfully with the
 help of sample Producer example and Low-level Consumer example. The kafka
 broker and zookeeper are both running with default configurations.
 The setup works fine for coupe of hours and then it just shuts down, Is
 there some shutdown on idle property in config file that prompts the
 shutdown. How can I prevent it from shutting down, Any help would be
 highly
 appreciated.

 Link to StackOverFlow Question
 
 http://stackoverflow.com/questions/27873292/kafka-server-shutting-down-abruptly
 
 Logs https://gist.github.com/kartiksingh/6db4b6548f2eb91ef0b6

 Regards
 Kartik





Re: Kafka broker shutting down after running fine for 1-2 hours

2015-01-10 Thread Manikumar Reddy
Are you seeing any errors/exceptions? Can you paste Kafka log output?

On Sat, Jan 10, 2015 at 2:42 PM, Kartik Singh kartiksi...@giveter.com
wrote:

 Hello,
 We have just started using kafka. Our test setup consists of a single
 partition. We have integrated kafka to our system successfully with the
 help of sample Producer example and Low-level Consumer example. The kafka
 broker and zookeeper are both running with default configurations.
 The setup works fine for coupe of hours and then it just shuts down, Is
 there some shutdown on idle property in config file that prompts the
 shutdown. How can I prevent it from shutting down, Any help would be highly
 appreciated.

 Link to StackOverFlow Question
 
 http://stackoverflow.com/questions/27873292/kafka-server-shutting-down-abruptly
 
 Logs https://gist.github.com/kartiksingh/6db4b6548f2eb91ef0b6

 Regards
 Kartik



Re: Kafka broker shutting down after running fine for 1-2 hours

2015-01-10 Thread Manikumar Reddy
Sorry.. i missed your link.

On Sat, Jan 10, 2015 at 9:31 PM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 Are you seeing any errors/exceptions? Can you paste Kafka log output?

 On Sat, Jan 10, 2015 at 2:42 PM, Kartik Singh kartiksi...@giveter.com
 wrote:

 Hello,
 We have just started using kafka. Our test setup consists of a single
 partition. We have integrated kafka to our system successfully with the
 help of sample Producer example and Low-level Consumer example. The kafka
 broker and zookeeper are both running with default configurations.
 The setup works fine for coupe of hours and then it just shuts down, Is
 there some shutdown on idle property in config file that prompts the
 shutdown. How can I prevent it from shutting down, Any help would be
 highly
 appreciated.

 Link to StackOverFlow Question
 
 http://stackoverflow.com/questions/27873292/kafka-server-shutting-down-abruptly
 
 Logs https://gist.github.com/kartiksingh/6db4b6548f2eb91ef0b6

 Regards
 Kartik





Re: kafka monitoring

2015-01-08 Thread Manikumar Reddy
Hi,
  you need to set jmx remote port.

  you can set this by executing below line in terminal  and start server.
  (or) add below line to kafka-run-class.sh and start server.

  export JMX_PORT= (jmx remote port)

and connect jconsole by giving  brokerip:

On Fri, Jan 9, 2015 at 12:38 AM, Sa Li sal...@gmail.com wrote:

 Hello, All

 I understand many of you are using jmxtrans along with graphite/ganglia to
 pull out metrics, according to https://kafka.apache.org/081/ops.html,  it
 says The easiest way to see the available metrics to fire up jconsole and
 point it at a running kafka client or server; this will all browsing all
 metrics with JMX. ..

 I tried to fire up a jconsole on windows attempting to access our dev and
 production cluster which are running good,
 here is the main node of my dev:
 10.100.75.128, broker port:9092, zk port:2181

 Jconsole shows:

  New Connection
 Remote Process:

 Usage: hostname:port OR service:jmx:protocol:sap
 Username:Password:

 Sorry about my naive, I tried connect base on above ip just can't be
 connected, do I need to do something in dev server to be able to make it
 work?

 thanks

 --

 Alec Li



Re: ProducerData jar file

2014-12-11 Thread Manikumar Reddy
Hi,

You just need to include the libraries available in kafka/libs folder.

Pl follow below example
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example




On Thu, Dec 11, 2014 at 4:43 PM, kishore kumar akishore...@gmail.com
wrote:

 do i need to download this separately ? my requirement is ingest the data
 in  my csv files into kafka, please help me how to do it with java code.

 On Thu, Dec 11, 2014 at 1:30 PM, kishore kumar akishore...@gmail.com
 wrote:

 
  Hi Experts,
 
  kafka.javaapi.producer.ProducerData class is available in 0.8.1 ? I am
  unable to use it, I copied all the jars in /build/dependant-libs-2.10.1
 and
  libs folders, any help ?
  --
  Thanks,
  Kishore.
 



 --
 Thanks,
 Kishore.



Re: ProducerData jar file

2014-12-11 Thread Manikumar Reddy
Hi,
  kafka.javaapi.producer.ProducerData class belongs to  kafka 0.7/0.6.
  This class is removed from 0.8 on-wards. Pl try with 0.8.x API.


Regards,
Manikumar

On Thu, Dec 11, 2014 at 8:10 PM, kishore kumar akishore...@gmail.com
wrote:

 hi Manikumar,

 As I mentioned in previous mail, I added the jars available in libs folder,
 but this class is not available in that jars, I am using cloudera's
 CLABS-KAFKA.

 On Thu, Dec 11, 2014 at 4:55 PM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:

  Hi,
 
  You just need to include the libraries available in kafka/libs folder.
 
  Pl follow below example
  https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
 
 
 
 
  On Thu, Dec 11, 2014 at 4:43 PM, kishore kumar akishore...@gmail.com
  wrote:
 
   do i need to download this separately ? my requirement is ingest the
 data
   in  my csv files into kafka, please help me how to do it with java
 code.
  
   On Thu, Dec 11, 2014 at 1:30 PM, kishore kumar akishore...@gmail.com
   wrote:
  
   
Hi Experts,
   
kafka.javaapi.producer.ProducerData class is available in 0.8.1 ? I
 am
unable to use it, I copied all the jars in
 /build/dependant-libs-2.10.1
   and
libs folders, any help ?
--
Thanks,
Kishore.
   
  
  
  
   --
   Thanks,
   Kishore.
  
 



 --
 Thanks,
 Kishore.



Re: Pagecache cause OffsetOutOfRangeException

2014-12-02 Thread Manikumar Reddy
You can check the latest/earliest offsets of a given topic by running
GetOffsetShell.

https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-GetOffsetShell

On Tue, Dec 2, 2014 at 2:05 PM, yuanjia8947 yuanjia8...@163.com wrote:

 Hi all,
 I'm using kafka 0.8.0 release now. And I often encounter the problem
 OffsetOutOfRangeException when cosuming message by simple consumer API.
 But I'm sure that the consuming offset is smaller than the latest offset
 got from OffsetRequest.
 Can it be caused by that new messages are wrote to kernel's pagecache and
 not flush to the file yet,
 while I'm consuming new messages from the file?
 How fix it?

 Thanks,
 liyuanjia





 liyuanjia


Re: Kafka 0.8.2 log cleaner

2014-11-30 Thread Manikumar Reddy
Log cleaner does not support topics with compressed messages.

https://issues.apache.org/jira/browse/KAFKA-1374

On Sun, Nov 30, 2014 at 5:33 PM, Mathias Söderberg 
mathias.soederb...@gmail.com wrote:

 Does the log cleaner in 0.8.2 support topics with compressed messages? IIRC
 that wasn't supported in 0.8.1.1.

 On 29 November 2014 at 17:23, Jun Rao jun...@gmail.com wrote:

  Yes, log cleaner is in 0.8.2. You just need to set the retention policy
 of
  a topic to compact.
 
  Thanks,
 
  Jun
 
  On Thu, Nov 27, 2014 at 5:20 AM, Khandygo, Evgeny (EXT) 
  evgeny.khandygo@siemens.com wrote:
 
   I’m wondering if you could tell me whether log cleaner implemented in
   0.8.2 because it seems like it didn’t.
  
   Thanks
   John
  
  
 



Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Manikumar Reddy
+1 for this change.

what about de-serializer  class in 0.8.2?  Say i am using new producer with
Avro and old consumer combination.
then i need to give custom Decoder implementation for Avro right?.

On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein joe.st...@stealth.ly wrote:

 The serializer is an expected use of the producer/consumer now and think we
 should continue that support in the new client. As far as breaking the API
 it is why we released the 0.8.2-beta to help get through just these type of
 blocking issues in a way that the community at large could be involved in
 easier with a build/binaries to download and use from maven also.

 +1 on the change now prior to the 0.8.2 release.

 - Joe Stein


 On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian 
 srsubraman...@linkedin.com.invalid wrote:

  Looked at the patch. +1 from me.
 
  On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
  As one of the people who spent too much time building Avro repositories,
  +1
  on bringing serializer API back.
  
  I think it will make the new producer easier to work with.
  
  Gwen
  
  On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com wrote:
  
   This is admittedly late in the release cycle to make a change. To add
 to
   Jun's description the motivation was that we felt it would be better
 to
   change that interface now rather than after the release if it needed
 to
   change.
  
   The motivation for wanting to make a change was the ability to really
 be
   able to develop support for Avro and other serialization formats. The
   current status is pretty scattered--there is a schema repository on an
  Avro
   JIRA and another fork of that on github, and a bunch of people we have
   talked to have done similar things for other serialization systems. It
   would be nice if these things could be packaged in such a way that it
  was
   possible to just change a few configs in the producer and get rich
  metadata
   support for messages.
  
   As we were thinking this through we realized that the new api we were
  about
   to introduce was kind of not very compatable with this since it was
 just
   byte[] oriented.
  
   You can always do this by adding some kind of wrapper api that wraps
 the
   producer. But this puts us back in the position of trying to document
  and
   support multiple interfaces.
  
   This also opens up the possibility of adding a MessageValidator or
   MessageInterceptor plug-in transparently so that you can do other
 custom
   validation on the messages you are sending which obviously requires
  access
   to the original object not the byte array.
  
   This api doesn't prevent using byte[] by configuring the
   ByteArraySerializer it works as it currently does.
  
   -Jay
  
   On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote:
  
Hi, Everyone,
   
I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new
 java
producer takes a byte array for both the key and the value. While
 this
   api
is simple, it pushes the serialization logic into the application.
  This
makes it hard to reason about what type of data is being sent to
 Kafka
   and
also makes it hard to share an implementation of the serializer. For
example, to support Avro, the serialization logic could be quite
  involved
since it might need to register the Avro schema in some remote
  registry
   and
maintain a schema cache locally, etc. Without a serialization api,
  it's
impossible to share such an implementation so that people can easily
   reuse.
We sort of overlooked this implication during the initial discussion
  of
   the
producer api.
   
So, I'd like to propose an api change to the new producer by adding
  back
the serializer api similar to what we had in the old producer.
  Specially,
the proposed api changes are the following.
   
First, we change KafkaProducer to take generic types K and V for the
  key
and the value, respectively.
   
public class KafkaProducerK,V implements ProducerK,V {
   
public FutureRecordMetadata send(ProducerRecordK,V record,
   Callback
callback);
   
public FutureRecordMetadata send(ProducerRecordK,V record);
}
   
Second, we add two new configs, one for the key serializer and
 another
   for
the value serializer. Both serializers will default to the byte
 array
implementation.
   
public class ProducerConfig extends AbstractConfig {
   
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
org.apache.kafka.clients.producer.ByteArraySerializer,
  Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
org.apache.kafka.clients.producer.ByteArraySerializer,
  Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC);
}
   
Both serializers will implement the following interface.
   
public interface 

  1   2   >