Re: Log segment deletion

2018-01-30 Thread Guozhang Wang
Hi Martin,

That is a good point. In fact in the coming release we have made such
repartition topics really "transient" by periodically purging it with the
embedded admin client, so we can actually set its retention to -1:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier


For non-internal topics, like the sink topics, though, you still need to
manually set the configs to allow old records to be appended.

Guozhang


On Tue, Jan 30, 2018 at 11:57 AM, Martin Kleppmann 
wrote:

> Hi Guozhang,
>
> Thanks very much for your reply. I am inclined to consider this a bug,
> since Kafka Streams in the default configuration is likely to run into this
> problem while reprocessing old messages, and in most cases the problem
> wouldn't be noticed (since there is no error -- the job just produces
> incorrect output).
>
> The repartitioning topics are already created with a config of
> cleanup.policy=delete, regardless of the brokers' default config. Would it
> make sense for Kafka Streams to also set a config of retention.ms=-1 or
> message.timestamp.type=LogAppendTime on repartitioning topics when they
> are created? However, neither setting is ideal (if time-based retention is
> set to infinite, retention.bytes needs to be configured instead; if
> LogAppendTime is used, the original message timestamps are lost, which may
> break windowing functions). Or maybe Kafka Streams can throw an exception
> if it processes messages that are older than the retention period, to
> ensure that the developer notices the problem, rather than having messages
> silently dropped?
>
> Best,
> Martin
>
> > On 29 Jan 2018, at 18:01, Guozhang Wang  wrote:
> >
> > Hello Martin,
> >
> > What you've observed is correct. More generally speaking, for various
> > broker-side operations that based on record timestamps and treating them
> as
> > wall-clock time, there is a mismatch between the stream records'
> timestamp
> > which is basically "event time", against the broker's system wall-clock
> > time (i.e. when the events gets ingested into Kafka v.s. the events
> > happened).
> >
> > For you example, when brokers decide when to roll a new segment or delete
> > an old segment, they are effectively comparing the record timestamp with
> > the system "wall-clock time". This has a bad effect for re-processing and
> > bootstrapping scenarios (thinking: processing a record whose timestamp is
> > from a week ago and then trying to send it to the intermediate topic with
> > system time "NOW").
> >
> > We are actively discussing on how to close this gap. As of now, your
> > walk-around solution looks good to me, or you can also consider setting
> the
> > broker config *"log.message.timestamp.difference.max.ms
> > "* to very long values.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Jan 29, 2018 at 8:23 AM, Martin Kleppmann 
> > wrote:
> >
> >> Follow-up: I think we figured out what was happening. Setting the broker
> >> config log.message.timestamp.type=LogAppendTime (instead of the default
> >> value CreateTime) stopped the messages disappearing.
> >>
> >> The messages in the Streams app's input topic are older than the 24
> hours
> >> default retention period. On this input topic, we have set an unlimited
> >> retention period. Am I right in thinking that in the setting
> >> message.timestamp.type=CreateTime, the input message timestamp is
> carried
> >> over to the messages in the repartitioning topic? And furthermore, the
> >> time-based retention of messages in the repartitioning topic is based on
> >> the message timestamps?
> >>
> >> If that is the case, then messages that are older than 24 hours are
> >> immediately scheduled for deletion as soon as they are copied over into
> the
> >> repartitioning topic. Thus, we have a race between the log cleaner and
> the
> >> consumer in the second stage of the streams app. :-(
> >>
> >> Is log.message.timestamp.type=LogAppendTime the best way of avoiding
> this
> >> problem?
> >>
> >> Thanks,
> >> Martin
> >>
> >>> On 29 Jan 2018, at 15:44, Martin Kleppmann 
> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> We are debugging an issue with a Kafka Streams application that is
> >> producing incorrect output. The application is a simple group-by on a
> key,
> >> and then count. As expected, the application creates a repartitioning
> topic
> >> for the group-by stage. The problem appears to be that messages are
> getting
> >> lost in the repartitioning topic.
> >>>
> >>> Looking at the Kafka broker logs, it appears that the log segments for
> >> the repartitioning topic are getting marked for deletion very
> aggressively
> >> (within ~2 seconds of being created), so fast that some segments are
> >> deleted before the count stage of the Kafka Streams application has had
> a
> >> chance to consume the messages.
> >>>
> >>> I have checked the configuration and I cannot see a reason why the log
> >> segments should

Re: Recommended max number of topics (and data separation)

2018-01-30 Thread Andrey Falko
On Tue, Jan 30, 2018 at 1:38 PM, David Espinosa  wrote:
> Hi Andrey,
> My topics are replicated with a replicated factor equals to the number of
> nodes, 3 in this test.
> Didn't know about the kip-227.
> The problems I see at 70k topics coming from ZK are related to any
> operation where ZK has to retrieve topics metadata. Just listing topics at
> 50K or 60k you will experience a big delay in the response. I have no more
> details about these problems, but is easy to reproduce the latency in the
> topics list request.

AFAIK kafka doesn't do a full list as part of normal operations from
ZK. If you have requirements in your consumer/producer code on doing
--describe, then that would be a problem. I think that can be worked
around. Based on my profiling data so far, while things are working in
non-failure mode, none of the ZK functions pop up as "hot methods".

> Thanks me for pointing me to this parameter,  vm.max_map_count, it wasn't
> on my radar. Could you tell me what value you use?

I set it to the max allowable on Amzn Linux: vm.max_map_count=1215752192

> The other way around about topic naming, I think the longer the topic names
> are the sooner jute.maxbuffer overflows.

I see; what value(s) have you tried with and how much gain did you you see?

> David
>
>
> 2018-01-30 4:40 GMT+01:00 Andrey Falko :
>
>> On Sun, Jan 28, 2018 at 8:45 AM, David Espinosa  wrote:
>> > Hi Monty,
>> >
>> > I'm also planning to use a big amount of topics in Kafka, so recently I
>> > made a test within a 3 nodes kafka cluster where I created 100k topics
>> with
>> > one partition. Sent 1M messages in total.
>>
>> Are your topic partitions replicated?
>>
>> > These are my conclusions:
>> >
>> >- There is not any limitation on kafka regarding the number of topics
>> >but on Zookeeper and in the system where Kafka nodes is allocated.
>>
>> There are also the problems being addressed in KIP-227:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
>> Partition+Scalability
>>
>> >- Zookeeper will start having problems from 70k topics, which can be
>> >solved modifying a buffer parameter on the JVM (-Djute.maxbuffer).
>> >Performance is reduced.
>>
>> What kind of problems do you see at 70k topics? If performance is
>> reduced w/ modifying jute.maxbuffer, won't that effect the performance
>> of kafka interms of how long it takes to recover from broker failure,
>> creating/deleting topics, producing and consuming?
>>
>> >- Open file descriptors of the system are equivalent to [number of
>> >topics]X[number of partitions per topic]. Set to 128k in my test to
>> avoid
>> >problems.
>> >- System needs a big amount of memory for page caching.
>>
>> I also had to tune vm.max_map_count much higher.
>>
>> >
>> > So, after creating 100k with the required setup (system+JVM) but seeing
>> > problems at 70k, I feel safe by not creating more than 50k, and always
>> will
>> > have Zookeeper as my first suspect if a problem comes. I think with
>> proper
>> > resources (memory) and system setup (open file descriptors), you don't
>> have
>> > any real limitation regarding partitions.
>>
>> I can confirm the 50k number. After about 40k-45k topics, I start
>> seeing slow down in consume offset commit latencies that eclipse 50ms.
>> Hopefully KIP-227 will alleviate that problem and leave ZK as the last
>> remaining hurdle. I'm testing with 3x replication per partition and 10
>> brokers.
>>
>> > By the way, I used long topic names (about 30 characters), which can be
>> > important for ZK.
>>
>> I'd like to learn more about this, are you saying that long topic
>> names would improve ZK performance because that relates to bumping up
>> jute.maxbuffer?
>>
>> > Hope this information is of your help.
>> >
>> > David
>> >
>> > 2018-01-28 2:22 GMT+01:00 Monty Hindman :
>> >
>> >> I'm designing a system and need some more clarity regarding Kafka's
>> >> recommended limits on the number of topics and/or partitions. At a high
>> >> level, our system would work like this:
>> >>
>> >> - A user creates a job X (X is a UUID).
>> >> - The user uploads data for X to an input topic: X.in.
>> >> - Workers process the data, writing results to an output topic: X.out.
>> >> - The user downloads the data from X.out.
>> >>
>> >> It's important for the system that data for different jobs be kept
>> >> separate, and that input and output data be kept separate. By
>> "separate" I
>> >> mean that there needs to be a reasonable way for users and the system's
>> >> workers to query for the data they need (by job-id and by
>> input-vs-output)
>> >> and not get the data they don't need.
>> >>
>> >> Based on expected usage and our data retention policy, we would not
>> expect
>> >> to need more than 12,000 active jobs at any one time -- in other words,
>> >> 24,000 topics. If we were to have 5 partitions per topic (our cluster
>> has 5
>> >> brokers), that would imply 120,000 part

Re: ReadOnlyKeyValueStore.range API

2018-01-30 Thread Matthias J. Sax
You need to write some custom code using Interactive Queries and
implement a scatter-gather pattern.

Basically, you need to do the range on each instance and then merge all
partial results.

https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html

You can also find an example for IQ here:
https://github.com/confluentinc/kafka-streams-examples


-Matthias

On 1/29/18 10:49 PM, Debasish Ghosh wrote:
> Hello -
> 
> The above API gives me the range of values between fromKey and toKey for a
> local state store.
> 
> Suppose I have an application running in distributed mode (multiple nodes
> same application id). How does this API translate to multiple nodes ? I
> know the  basic implementation is for a local node. But is there an
> algorithm to implement this range() function for the distributed setting ?
> 
> I can find out the host where fromKey lives, also the host where toKey
> lives. But how do I know the elements in between ?
> 
> Any pointer will be appreciated ..
> 
> regards.
> 



signature.asc
Description: OpenPGP digital signature


Re: Recommended max number of topics (and data separation)

2018-01-30 Thread David Espinosa
Hi Andrey,
My topics are replicated with a replicated factor equals to the number of
nodes, 3 in this test.
Didn't know about the kip-227.
The problems I see at 70k topics coming from ZK are related to any
operation where ZK has to retrieve topics metadata. Just listing topics at
50K or 60k you will experience a big delay in the response. I have no more
details about these problems, but is easy to reproduce the latency in the
topics list request.
Thanks me for pointing me to this parameter,  vm.max_map_count, it wasn't
on my radar. Could you tell me what value you use?
The other way around about topic naming, I think the longer the topic names
are the sooner jute.maxbuffer overflows.
David


2018-01-30 4:40 GMT+01:00 Andrey Falko :

> On Sun, Jan 28, 2018 at 8:45 AM, David Espinosa  wrote:
> > Hi Monty,
> >
> > I'm also planning to use a big amount of topics in Kafka, so recently I
> > made a test within a 3 nodes kafka cluster where I created 100k topics
> with
> > one partition. Sent 1M messages in total.
>
> Are your topic partitions replicated?
>
> > These are my conclusions:
> >
> >- There is not any limitation on kafka regarding the number of topics
> >but on Zookeeper and in the system where Kafka nodes is allocated.
>
> There are also the problems being addressed in KIP-227:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> Partition+Scalability
>
> >- Zookeeper will start having problems from 70k topics, which can be
> >solved modifying a buffer parameter on the JVM (-Djute.maxbuffer).
> >Performance is reduced.
>
> What kind of problems do you see at 70k topics? If performance is
> reduced w/ modifying jute.maxbuffer, won't that effect the performance
> of kafka interms of how long it takes to recover from broker failure,
> creating/deleting topics, producing and consuming?
>
> >- Open file descriptors of the system are equivalent to [number of
> >topics]X[number of partitions per topic]. Set to 128k in my test to
> avoid
> >problems.
> >- System needs a big amount of memory for page caching.
>
> I also had to tune vm.max_map_count much higher.
>
> >
> > So, after creating 100k with the required setup (system+JVM) but seeing
> > problems at 70k, I feel safe by not creating more than 50k, and always
> will
> > have Zookeeper as my first suspect if a problem comes. I think with
> proper
> > resources (memory) and system setup (open file descriptors), you don't
> have
> > any real limitation regarding partitions.
>
> I can confirm the 50k number. After about 40k-45k topics, I start
> seeing slow down in consume offset commit latencies that eclipse 50ms.
> Hopefully KIP-227 will alleviate that problem and leave ZK as the last
> remaining hurdle. I'm testing with 3x replication per partition and 10
> brokers.
>
> > By the way, I used long topic names (about 30 characters), which can be
> > important for ZK.
>
> I'd like to learn more about this, are you saying that long topic
> names would improve ZK performance because that relates to bumping up
> jute.maxbuffer?
>
> > Hope this information is of your help.
> >
> > David
> >
> > 2018-01-28 2:22 GMT+01:00 Monty Hindman :
> >
> >> I'm designing a system and need some more clarity regarding Kafka's
> >> recommended limits on the number of topics and/or partitions. At a high
> >> level, our system would work like this:
> >>
> >> - A user creates a job X (X is a UUID).
> >> - The user uploads data for X to an input topic: X.in.
> >> - Workers process the data, writing results to an output topic: X.out.
> >> - The user downloads the data from X.out.
> >>
> >> It's important for the system that data for different jobs be kept
> >> separate, and that input and output data be kept separate. By
> "separate" I
> >> mean that there needs to be a reasonable way for users and the system's
> >> workers to query for the data they need (by job-id and by
> input-vs-output)
> >> and not get the data they don't need.
> >>
> >> Based on expected usage and our data retention policy, we would not
> expect
> >> to need more than 12,000 active jobs at any one time -- in other words,
> >> 24,000 topics. If we were to have 5 partitions per topic (our cluster
> has 5
> >> brokers), that would imply 120,000 partitions. [These number refer only
> to
> >> main/primary partitions, not any replicas that might exist.]
> >>
> >> Those numbers seem to be far larger than the suggested limits I see
> online.
> >> For example, the Kafka FAQ on these matters seems to imply that the most
> >> relevant limit is the number of partitions (rather than topics) and
> sort of
> >> implies that 10,000 partitions might be a suggested guideline (
> >> https://goo.gl/fQs2md). Also implied is that systems should use fewer
> >> topics and instead partition the data within topics if further
> separation
> >> is needed (the FAQ entry uses the example of partitioning by user ID,
> which
> >> is roughly an

Kafka Consumer Offsets unavailable during rebalancing

2018-01-30 Thread Wouter Bancken
Hi,

I'm trying to write an external tool to monitor consumer lag on Apache
Kafka.

For this purpose, I'm using the kafka-consumer-groups tool to fetch the
consumer offsets.

When using this tool, partition assignments seem to be unavailable
temporarily during the creation of a new topic even if the consumer group
has no subscription on this new topic. This seems to match the documentation

saying *"Topic metadata changes which have no impact on subscriptions cause
resync"*.

However, when this occurs I'd expect the state of the consumer to be
"PreparingRebalance" or "AwaitingSync" but it is simply "Stable".

Is this a bug in the tooling or is there a different way to obtain the
correct offsets for a consumer group during a rebalance?

I'm using Kafka 10.2.1 but I haven't found any related issues in recent
changelogs.
Best regards,
Wouter


Re: Log segment deletion

2018-01-30 Thread Martin Kleppmann
Hi Guozhang,

Thanks very much for your reply. I am inclined to consider this a bug, since 
Kafka Streams in the default configuration is likely to run into this problem 
while reprocessing old messages, and in most cases the problem wouldn't be 
noticed (since there is no error -- the job just produces incorrect output).

The repartitioning topics are already created with a config of 
cleanup.policy=delete, regardless of the brokers' default config. Would it make 
sense for Kafka Streams to also set a config of retention.ms=-1 or 
message.timestamp.type=LogAppendTime on repartitioning topics when they are 
created? However, neither setting is ideal (if time-based retention is set to 
infinite, retention.bytes needs to be configured instead; if LogAppendTime is 
used, the original message timestamps are lost, which may break windowing 
functions). Or maybe Kafka Streams can throw an exception if it processes 
messages that are older than the retention period, to ensure that the developer 
notices the problem, rather than having messages silently dropped?

Best,
Martin

> On 29 Jan 2018, at 18:01, Guozhang Wang  wrote:
> 
> Hello Martin,
> 
> What you've observed is correct. More generally speaking, for various
> broker-side operations that based on record timestamps and treating them as
> wall-clock time, there is a mismatch between the stream records' timestamp
> which is basically "event time", against the broker's system wall-clock
> time (i.e. when the events gets ingested into Kafka v.s. the events
> happened).
> 
> For you example, when brokers decide when to roll a new segment or delete
> an old segment, they are effectively comparing the record timestamp with
> the system "wall-clock time". This has a bad effect for re-processing and
> bootstrapping scenarios (thinking: processing a record whose timestamp is
> from a week ago and then trying to send it to the intermediate topic with
> system time "NOW").
> 
> We are actively discussing on how to close this gap. As of now, your
> walk-around solution looks good to me, or you can also consider setting the
> broker config *"log.message.timestamp.difference.max.ms
> "* to very long values.
> 
> 
> Guozhang
> 
> 
> On Mon, Jan 29, 2018 at 8:23 AM, Martin Kleppmann 
> wrote:
> 
>> Follow-up: I think we figured out what was happening. Setting the broker
>> config log.message.timestamp.type=LogAppendTime (instead of the default
>> value CreateTime) stopped the messages disappearing.
>> 
>> The messages in the Streams app's input topic are older than the 24 hours
>> default retention period. On this input topic, we have set an unlimited
>> retention period. Am I right in thinking that in the setting
>> message.timestamp.type=CreateTime, the input message timestamp is carried
>> over to the messages in the repartitioning topic? And furthermore, the
>> time-based retention of messages in the repartitioning topic is based on
>> the message timestamps?
>> 
>> If that is the case, then messages that are older than 24 hours are
>> immediately scheduled for deletion as soon as they are copied over into the
>> repartitioning topic. Thus, we have a race between the log cleaner and the
>> consumer in the second stage of the streams app. :-(
>> 
>> Is log.message.timestamp.type=LogAppendTime the best way of avoiding this
>> problem?
>> 
>> Thanks,
>> Martin
>> 
>>> On 29 Jan 2018, at 15:44, Martin Kleppmann  wrote:
>>> 
>>> Hi all,
>>> 
>>> We are debugging an issue with a Kafka Streams application that is
>> producing incorrect output. The application is a simple group-by on a key,
>> and then count. As expected, the application creates a repartitioning topic
>> for the group-by stage. The problem appears to be that messages are getting
>> lost in the repartitioning topic.
>>> 
>>> Looking at the Kafka broker logs, it appears that the log segments for
>> the repartitioning topic are getting marked for deletion very aggressively
>> (within ~2 seconds of being created), so fast that some segments are
>> deleted before the count stage of the Kafka Streams application has had a
>> chance to consume the messages.
>>> 
>>> I have checked the configuration and I cannot see a reason why the log
>> segments should be getting deleted so quickly. The following line reports
>> the creation of the repartitioning topic:
>>> 
>>> [2018-01-29 15:31:39,992] INFO Created log for partition
>> [streamsapp_site_stats-v290118-debug_1517239899448-
>> localstore_log_event_counts-repartition,0] in /kafka-data with properties
>> {compression.type -> producer, message.format.version -> 0.11.0-IV2,
>> file.delete.delay.ms -> 10, max.message.bytes -> 112,
>> min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime,
>> min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false,
>> min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
>> unclean.leader.election.enable -> false, retention.bytes -> 1073741824,
>> d

what drives deletion of kafka topics

2018-01-30 Thread karan alang
I've Kafka 10.
I've a basic question - what determines when the Kafka topic marked for
deletion gets deleted ?

Today, i marked a topic for deletion, and it got deleted immediately
(possibly because the topic was not being used for last few months ?) ..
In earlier instances, i'd to wait for some time before i could see the
topic deleted

any ideas ?

Also, if it marked for deletion, can it be un-marked so it does not get
deleted ?


Re: skipped-records-rate vs skippedDueToDeserializationError-rate metric in streams app

2018-01-30 Thread Guozhang Wang
Your code for setting the handler seems right to me.

Another double checking: have you turned on DEBUG level metrics recording
in order for this metric? Note skippedDueToDeserializationError is recorded
as DEBUG level so you need to set metrics.recording.level accordingly
(default is INFO). Lower level metrics than this config will not be
recorded nor reported.

Also note that turning on DEBUG level metrics recording has an impact on
your application's performance, since it is mostly for finer granularity
(per processor node, per task, etc) and hence recording overhead is higher
than those INFO metrics which are for global thread-level sensors.


Guozhang


On Tue, Jan 30, 2018 at 4:23 AM, Srikanth  wrote:

> Guozhang,
>
> Here is the snippet.
>
> private Properties getProperties() {
> Properties p = new Properties();
> ...
> p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaConfig.getString("
> streamThreads"));
> p.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_
> HANDLER_CLASS_CONFIG,
> LogAndContinueExceptionHandler.class);
> ...
> return p;
>   }
>
> StreamsConfig streamsConfig = new StreamsConfig(getProperties())
> KafkaStreams kafkaStreams = new KafkaStreams(streamBuilder.
> build(),streamsConfig);
>
> Srikanth
>
> On Mon, Jan 29, 2018 at 11:10 PM, Guozhang Wang 
> wrote:
>
> > Hi Srikanth,
> >
> > How did you set the LogAndContinueExceptionHandler in the configs? Could
> > you copy the code snippet here?
> >
> > Guozhang
> >
> >
> > On Sun, Jan 28, 2018 at 11:26 PM, Srikanth 
> wrote:
> >
> > > Kafka-streams version "1.0.0".
> > >
> > > Thanks,
> > > Srikanth
> > >
> > > On Mon, Jan 29, 2018 at 12:23 AM, Guozhang Wang 
> > > wrote:
> > >
> > > > Hello Srikanth,
> > > >
> > > > Which version of Kafka are you using? I'd like to dig for that
> > particular
> > > > branch again.
> > > >
> > > > Guozhang
> > > >
> > > > On Sun, Jan 28, 2018 at 8:54 AM, Srikanth 
> > wrote:
> > > >
> > > > > Guozhang,
> > > > >
> > > > > While I understand that this metric is meaningless when handler is
> > set
> > > to
> > > > > FAIL, in my case I'm actually using LogAndContinueExceptionHandler
> .
> > > > > In this case, app needs to report such occurrences. What I noticed
> is
> > > > that
> > > > > only skipped-records is set.
> > > > > The granularity offered by skippedDueToDeserializationError is
> > > missing.
> > > > >
> > > > > Srikanth
> > > > >
> > > > > On Fri, Jan 26, 2018 at 10:45 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Srikanth,
> > > > > >
> > > > > > Looked at the source code once again and discussing with other
> > > > committer
> > > > > I
> > > > > > now remembered why we designed it that way: when you set the
> > > > > > HandlerResponse to FAIL, it means that once a "poison record" is
> > > > > received,
> > > > > > stop the world by throwing this exception all the way up. And
> hence
> > > at
> > > > > that
> > > > > > time the application would be stopped anyways so we would not
> need
> > to
> > > > > > record this metric.
> > > > > >
> > > > > > So in other words, I think it is rather a documentation
> improvement
> > > > that
> > > > > we
> > > > > > should do.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 26, 2018 at 8:56 AM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Helo Srikanth,
> > > > > > >
> > > > > > > Thanks for reporting this, as I checked the code
> > > > > > skippedDueToDeserializati
> > > > > > > onError is effectively only recorded when the
> > > > > DeserializationHandlerResp
> > > > > > > onse is not set to FAIL. I agree it is not exactly matching the
> > > > > > > documentation's guidance, and will try to file a JIRA and fix
> it.
> > > > > > >
> > > > > > > As for skippedDueToDeserializationError and
> > skipped-records-rate,
> > > > > there
> > > > > > > is an open JIRA discussing about this: https://issues.apache.
> > > > > > > org/jira/browse/KAFKA-6376, just FYI.
> > > > > > >
> > > > > > >
> > > > > > > Could you share on which version of Kafka did you observe this
> > > issue?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > > On Fri, Jan 26, 2018 at 6:30 AM, Srikanth <
> srikanth...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Hello,
> > > > > > >>
> > > > > > >> As per doc when LogAndContinueExceptionHandler is used it will
> > set
> > > > > > >> skippedDueToDeserializationError-rate metric to indicate
> > > > > > deserialization
> > > > > > >> error.
> > > > > > >> I notice that it is never set. Instead skipped-records-rate is
> > > set.
> > > > My
> > > > > > >> understanding was that skipped-records-rate is set due to
> > > timestamp
> > > > > > >> extraction errors.
> > > > > > >>
> > > > > > >> Ex, I sent a few invalid records to a topic and was able to
> see
> > > > > > exception
> > > > > > >> during deserialization.
> > > > > > >>
> > > > > > >> org.apache.kafka.commo

Re: monitor consumer offset lag script/code

2018-01-30 Thread Kaufman Ng
Alternatively, you can dump out the consumer offsets using a command like
this:

kafka-console-consumer --topic __consumer_offsets --bootstrap-server
localhost:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

On Tue, Jan 30, 2018 at 8:38 AM, Subhash Sriram 
wrote:

> Hi Sunil,
>
> Burrow might be of interest to you:
>
> https://github.com/linkedin/Burrow
>
> Hope that helps.
>
> Thanks,
> Subhash
>
> Sent from my iPhone
>
> > On Jan 29, 2018, at 7:40 PM, Sunil Parmar  wrote:
> >
> > We're using 0.9 ( CDH ) and consumer offsets are stored within Kafka.
> What
> > is the preferred way to get consumer offset from code or script for
> > monitoring ? Is there any sample code/ script to do so ?
> >
> > Thanks,
> > Sunil Parmar
>



-- 
Kaufman Ng
+1 646 961 8063
Solutions Architect | Confluent | www.confluent.io


Re: Group consumer cannot consume messages if kafka service on specific node in test cluster is down

2018-01-30 Thread Zoran
Sorry, I have attached wrong server.properties file. Now the right one 
is in the attachment.


Regards.


On 01/30/2018 02:59 PM, Zoran wrote:

Hi,

I have three servers:

blade1 (192.168.112.31),
blade2 (192.168.112.32) and
blade3 (192.168.112.33).

On each of servers kafka_2.11-1.0.0 is installed.
On blade3 (192.168.112.33:2181) zookeeper is installed as well.

I have created a topic repl3part5 with the following line:

bin/kafka-topics.sh --zookeeper 192.168.112.33:2181 --create 
--replication-factor 3 --partitions 5 --topic repl3part5


When I describe the topic, it looks like this:

[root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5 
--zookeeper 192.168.112.33:2181


Topic:repl3part5    PartitionCount:5    ReplicationFactor:3 Configs:
    Topic: repl3part5    Partition: 0    Leader: 2    Replicas: 
2,3,1    Isr: 2,3,1
    Topic: repl3part5    Partition: 1    Leader: 3    Replicas: 
3,1,2    Isr: 3,1,2
    Topic: repl3part5    Partition: 2    Leader: 1    Replicas: 
1,2,3    Isr: 1,2,3
    Topic: repl3part5    Partition: 3    Leader: 2    Replicas: 
2,1,3    Isr: 2,1,3
    Topic: repl3part5    Partition: 4    Leader: 3    Replicas: 
3,2,1    Isr: 3,2,1


I have a producer for this topic:

bin/kafka-console-producer.sh --broker-list 
192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic 
repl3part5


and single consumer:

bin/kafka-console-consumer.sh --bootstrap-server 
192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic 
repl3part5  --consumer-property group.id=zoran_1


Every message that is sent by producer gets collected by consumer. So 
far - so good.


Now I would like to test fail over of the kafka servers. If I put down 
blade 3 kafka service, I get consumer warnings but all produced 
messages are still consumed.


[2018-01-30 14:30:01,203] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 3 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:30:01,299] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 3 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:30:01,475] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 3 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)


Now I have started up kafka service on blade 3 and I have put down 
kafka service on blade 2 server.
Consumer now showed one warning but all produced messages are still 
consumed.


[2018-01-30 14:31:38,164] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)


Now I have started up kafka service on blade 2 and I have put down 
kafka service on blade 1 server.


Consumer now shows warnings about node 1/2147483646, but also 
Asynchronous auto-commit of offsets ... failed: Offset commit failed 
with a retriable exception. You should retry committing offsets. The 
underlying error was: null.


[2018-01-30 14:33:16,393] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 1 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,469] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2147483646 could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,557] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 1 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,986] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2147483646 could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,991] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 1 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:17,493] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2147483646 could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:17,495] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 1 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:18,002] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2147483646 could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:18,003] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Asynchronous auto-commit of offsets 
{repl3part5-4=OffsetAndMetadata{offset=18, metadata=''}, 
repl3part5-3=OffsetAndMetadata{offset=20, metadata=''}, 
repl3part5-2=OffsetAn

Group consumer cannot consume messages if kafka service on specific node in test cluster is down

2018-01-30 Thread Zoran

Hi,

I have three servers:

blade1 (192.168.112.31),
blade2 (192.168.112.32) and
blade3 (192.168.112.33).

On each of servers kafka_2.11-1.0.0 is installed.
On blade3 (192.168.112.33:2181) zookeeper is installed as well.

I have created a topic repl3part5 with the following line:

bin/kafka-topics.sh --zookeeper 192.168.112.33:2181 --create 
--replication-factor 3 --partitions 5 --topic repl3part5


When I describe the topic, it looks like this:

[root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5 
--zookeeper 192.168.112.33:2181


Topic:repl3part5    PartitionCount:5    ReplicationFactor:3 Configs:
    Topic: repl3part5    Partition: 0    Leader: 2    Replicas: 
2,3,1    Isr: 2,3,1
    Topic: repl3part5    Partition: 1    Leader: 3    Replicas: 
3,1,2    Isr: 3,1,2
    Topic: repl3part5    Partition: 2    Leader: 1    Replicas: 
1,2,3    Isr: 1,2,3
    Topic: repl3part5    Partition: 3    Leader: 2    Replicas: 
2,1,3    Isr: 2,1,3
    Topic: repl3part5    Partition: 4    Leader: 3    Replicas: 
3,2,1    Isr: 3,2,1


I have a producer for this topic:

bin/kafka-console-producer.sh --broker-list 
192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic 
repl3part5


and single consumer:

bin/kafka-console-consumer.sh --bootstrap-server 
192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic 
repl3part5  --consumer-property group.id=zoran_1


Every message that is sent by producer gets collected by consumer. So 
far - so good.


Now I would like to test fail over of the kafka servers. If I put down 
blade 3 kafka service, I get consumer warnings but all produced messages 
are still consumed.


[2018-01-30 14:30:01,203] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 3 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:30:01,299] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 3 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:30:01,475] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 3 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)


Now I have started up kafka service on blade 3 and I have put down kafka 
service on blade 2 server.
Consumer now showed one warning but all produced messages are still 
consumed.


[2018-01-30 14:31:38,164] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)


Now I have started up kafka service on blade 2 and I have put down kafka 
service on blade 1 server.


Consumer now shows warnings about node 1/2147483646, but also 
Asynchronous auto-commit of offsets ... failed: Offset commit failed 
with a retriable exception. You should retry committing offsets. The 
underlying error was: null.


[2018-01-30 14:33:16,393] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 1 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,469] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2147483646 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,557] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 1 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,986] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2147483646 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,991] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 1 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:17,493] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2147483646 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:17,495] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 1 could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:18,002] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Connection to node 2147483646 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:18,003] WARN [Consumer clientId=consumer-1, 
groupId=zoran_1] Asynchronous auto-commit of offsets 
{repl3part5-4=OffsetAndMetadata{offset=18, metadata=''}, 
repl3part5-3=OffsetAndMetadata{offset=20, metadata=''}, 
repl3part5-2=OffsetAndMetadata{offset=19, metadata=''}, 
repl3part5-1=OffsetAndMetadata{offset=20, metadata=''}, 
repl3part5-0=OffsetAndMetadata{offset=20, metadata=''}}

Re: monitor consumer offset lag script/code

2018-01-30 Thread Subhash Sriram
Hi Sunil,

Burrow might be of interest to you:

https://github.com/linkedin/Burrow

Hope that helps.

Thanks,
Subhash

Sent from my iPhone

> On Jan 29, 2018, at 7:40 PM, Sunil Parmar  wrote:
> 
> We're using 0.9 ( CDH ) and consumer offsets are stored within Kafka. What
> is the preferred way to get consumer offset from code or script for
> monitoring ? Is there any sample code/ script to do so ?
> 
> Thanks,
> Sunil Parmar


Re: skipped-records-rate vs skippedDueToDeserializationError-rate metric in streams app

2018-01-30 Thread Srikanth
Guozhang,

Here is the snippet.

private Properties getProperties() {
Properties p = new Properties();
...
p.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, kafkaConfig.getString("
streamThreads"));
p.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
...
return p;
  }

StreamsConfig streamsConfig = new StreamsConfig(getProperties())
KafkaStreams kafkaStreams = new KafkaStreams(streamBuilder.
build(),streamsConfig);

Srikanth

On Mon, Jan 29, 2018 at 11:10 PM, Guozhang Wang  wrote:

> Hi Srikanth,
>
> How did you set the LogAndContinueExceptionHandler in the configs? Could
> you copy the code snippet here?
>
> Guozhang
>
>
> On Sun, Jan 28, 2018 at 11:26 PM, Srikanth  wrote:
>
> > Kafka-streams version "1.0.0".
> >
> > Thanks,
> > Srikanth
> >
> > On Mon, Jan 29, 2018 at 12:23 AM, Guozhang Wang 
> > wrote:
> >
> > > Hello Srikanth,
> > >
> > > Which version of Kafka are you using? I'd like to dig for that
> particular
> > > branch again.
> > >
> > > Guozhang
> > >
> > > On Sun, Jan 28, 2018 at 8:54 AM, Srikanth 
> wrote:
> > >
> > > > Guozhang,
> > > >
> > > > While I understand that this metric is meaningless when handler is
> set
> > to
> > > > FAIL, in my case I'm actually using LogAndContinueExceptionHandler.
> > > > In this case, app needs to report such occurrences. What I noticed is
> > > that
> > > > only skipped-records is set.
> > > > The granularity offered by skippedDueToDeserializationError is
> > missing.
> > > >
> > > > Srikanth
> > > >
> > > > On Fri, Jan 26, 2018 at 10:45 PM, Guozhang Wang 
> > > > wrote:
> > > >
> > > > > Hi Srikanth,
> > > > >
> > > > > Looked at the source code once again and discussing with other
> > > committer
> > > > I
> > > > > now remembered why we designed it that way: when you set the
> > > > > HandlerResponse to FAIL, it means that once a "poison record" is
> > > > received,
> > > > > stop the world by throwing this exception all the way up. And hence
> > at
> > > > that
> > > > > time the application would be stopped anyways so we would not need
> to
> > > > > record this metric.
> > > > >
> > > > > So in other words, I think it is rather a documentation improvement
> > > that
> > > > we
> > > > > should do.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Jan 26, 2018 at 8:56 AM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Helo Srikanth,
> > > > > >
> > > > > > Thanks for reporting this, as I checked the code
> > > > > skippedDueToDeserializati
> > > > > > onError is effectively only recorded when the
> > > > DeserializationHandlerResp
> > > > > > onse is not set to FAIL. I agree it is not exactly matching the
> > > > > > documentation's guidance, and will try to file a JIRA and fix it.
> > > > > >
> > > > > > As for skippedDueToDeserializationError and
> skipped-records-rate,
> > > > there
> > > > > > is an open JIRA discussing about this: https://issues.apache.
> > > > > > org/jira/browse/KAFKA-6376, just FYI.
> > > > > >
> > > > > >
> > > > > > Could you share on which version of Kafka did you observe this
> > issue?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Fri, Jan 26, 2018 at 6:30 AM, Srikanth  >
> > > > wrote:
> > > > > >
> > > > > >> Hello,
> > > > > >>
> > > > > >> As per doc when LogAndContinueExceptionHandler is used it will
> set
> > > > > >> skippedDueToDeserializationError-rate metric to indicate
> > > > > deserialization
> > > > > >> error.
> > > > > >> I notice that it is never set. Instead skipped-records-rate is
> > set.
> > > My
> > > > > >> understanding was that skipped-records-rate is set due to
> > timestamp
> > > > > >> extraction errors.
> > > > > >>
> > > > > >> Ex, I sent a few invalid records to a topic and was able to see
> > > > > exception
> > > > > >> during deserialization.
> > > > > >>
> > > > > >> org.apache.kafka.common.errors.SerializationException: Error
> > > > > >> deserializing
> > > > > >> Avro message for id -1
> > > > > >> Caused by: org.apache.kafka.common.
> errors.SerializationException:
> > > > > Unknown
> > > > > >> magic byte!
> > > > > >> 18/01/24 06:50:09 WARN StreamThread: Exception caught during
> > > > > >> Deserialization, taskId: 0_0, topic: docker.event.1, partition:
> 0,
> > > > > offset:
> > > > > >> 3764
> > > > > >>
> > > > > >> These incremented skipped-records-[rate|total].
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Srikanth
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>