Re: Kafka Monitoring

2017-12-15 Thread Irtiza Ali
Thank you Apostolis Glenis.

On Thu, Dec 14, 2017 at 7:30 PM, Apostolis Glenis 
wrote:

> I have also created a monitoring application for Kafka that uses
> prometheus.
> You can look at the source code here:
>
> https://github.com/aglenis/kafka_monitoring_pandas
>
> 2017-12-13 9:53 GMT+02:00 Irtiza Ali :
>
> > Ok thank you Michal
> >
> > On Tue, Dec 12, 2017 at 9:30 PM, Michal Michalski <
> > michal.michal...@zalando.ie> wrote:
> >
> > > Hi Irtiza,
> > >
> > > Unfortunately I don't know what could be the issue in your case - in my
> > > case all these metrics simply appear under appropriate keys
> > > (e.g. "kafka.controller") in the JSON response of the /list request -
> the
> > > same as the one you're making (I checked it right now to be sure). I
> also
> > > checked our Kafka broker configuration and we don't seem to use any
> > options
> > > related to metrics reporting (I don't even know if there are any...).
> > > Reading the Kafka documentation I'm under impression that all the
> metrics
> > > should be available to you out-of-the-box, so I'm not sure why you
> can't
> > > see them, sorry :-(
> > >
> > > M.
> > >
> > >
> > > On 11 December 2017 at 14:11, Irtiza Ali  wrote:
> > >
> > > > Hi Michal,
> > > >
> > > > I have followed the steps below for the configuring Jolokia with
> kafka
> > > >
> > > >-
> > > >
> > > >Install the jolokia JVM-Agent jar from their site.
> > > >-
> > > >
> > > >Copy the jolokia jar file in the  /opt/jolokia-agent/, create
> > > > jolokia-agent
> > > >directory as it doesn’t exist already.
> > > >-
> > > >
> > > >Move to the directory where your Kafka exists run the following
> > > command
> > > >
> > > > $ export KAFKA_OPTS="$KAFKA_OPTS
> > > > -javaagent:/opt/jolokia-agent/jolokia-jvm-1.3.7-agent.jar"
> > > >
> > > >-
> > > >
> > > >To expose the kafka metrics open the this file
> > > >/kafka/bin/kafka-server-start.sh file and find this (exec
> > > >$base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@") line
> and
> > > > paste
> > > >the line given below above it.
> > > >
> > > >
> > > > export JMX_PORT=${JMX_PORT:-}
> > > >
> > > >-
> > > >
> > > >Restart the zookeeper and broker
> > > >
> > > >
> > > > I think the issue is with the way I am exposing the metrics. I
> followed
> > > the
> > > > following link to enable JMX on kafka broker (
> > > > https://stackoverflow.com/questions/36708384/enable-jmx-
> > on-kafka-brokers
> > > ).
> > > > The issue that I am still facing is that when I list all the metrics
> > > using
> > > > (
> > > > http://localhost:/jolokia/list) address. i dont get metrics
> > related
> > > > to:
> > > > :8074/jolokia/read/kafka.server:*
> > > > :8074/jolokia/read/kafka.controller:*
> > > > :8074/jolokia/read/kafka.log:*
> > > > :8074/jolokia/read/kafka.network:*
> > > >
> > > >
> > > > Can you check what I have done wrong?
> > > >
> > > > Thanks
> > > >
> > > > With Regards
> > > > Irtiza
> > > >
> > > > On Fri, Dec 8, 2017 at 8:19 PM, Michal Michalski <
> > > > michal.michal...@zalando.ie> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > We have no modifications in that file - what we do is having a
> > > "wrapper"
> > > > > that's just a Docker "entrypoint" (just a bash script) which
> contents
> > > is:
> > > > >
> > > > > export KAFKA_OPTS="$KAFKA_OPTS
> > > > > -javaagent:/jolokia-jvm-agent.jar=port=8074,host=0.0.0.0"
> > > > > exec ${KAFKA_DIR}/bin/kafka-server-start.sh
> > > > > ${KAFKA_DIR}/config/server.properties
> > > > >
> > > > > Regarding your second question - we have an in-house monitoring
> > > solution
> > > > > that allows querying that endpoint and extracting metrics from the
> > JSON
> > > > > returned.
> > > > >
> > > > > If you're asking about what you should monitor, I think these links
> > > will
> > > > > answer your question:
> > > > > https://docs.confluent.io/3.0.0/kafka/monitoring.html
> > > > > https://www.datadoghq.com/blog/monitoring-kafka-
> performance-metrics/
> > > > >
> > > > >
> > > > > On 8 December 2017 at 14:56, Irtiza Ali  wrote:
> > > > >
> > > > > > Thanks Michal, can you kindly send me you kafka-run-class.sh and
> > > > > > kafka-server-start.sh file, I have look what have you done.
> > Because I
> > > > > have
> > > > > > done same thing that you explained above but when i do this <
> > > > > > http://localhost:/jolokia/list> i get only metrics for the
> > > > zookeeper
> > > > > > but not the above metrics.
> > > > > >
> > > > > > How are you using Jolokia for monitoring kafka cluster??
> > > > > >
> > > > > > Thanks in advance
> > > > > >
> > > > > > On Fri, Dec 8, 2017 at 3:10 PM, Michal Michalski <
> > > > > > michal.michal...@zalando.ie> wrote:
> > > > > >
> > > > > > > Hi Irtiza,
> > > > > > >
> > > > > > > I don't have any tutorial, but I can tell you what we do :-)
> > > > > > >
> > > > > > > First of all we have Jolokia agent jar included in our Kafka

Installing and Running Kafka

2017-12-15 Thread Karl Keller
Hello,

I’ve been trying most of the afternoon to get Kafka installed and running the 
basic quick start.

I am running into the following errors related to firing up zookeeper.  From 
the kafka directory:

Andreas-iMac:kafka_2.11-1.0.0 Andrea$ bin/zookeeper-server-start.sh 
config/zookeeper.properties
usage: dirname path
Classpath is empty. Please build the project first e.g. by running './gradlew 
jar -Pscala_version=2.11.11’

Then I went ahead and moved into the gradle directory, created the gradle 
wrapper and then executed the suggested build command show below.  I’m stuck on 
the absence of the “jar” not being in the root project.

Could you suggest a next step?  Thanks - Karl

Andreas-iMac:4.4 Andrea$ ./gradlew jar -Pscala_version=2.11.11

FAILURE: Build failed with an exception.

* What went wrong:
Task 'jar' not found in root project '4.4'.

* Try:
Run gradlew tasks to get a list of available tasks. Run with --stacktrace 
option to get the stack trace. Run with --info or --debug option to get more 
log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 1s


Re: Testing Punctuation in Kafka Streams

2017-12-15 Thread Matthias J. Sax
I am afraid, that atm there is not good support for this... :(

However, we plan to release official test utilities soon (planned for
v1.1) that should contain proper support for punctuations.

So stay tuned.


-Matthias


On 12/15/17 7:31 AM, Tom Wessels wrote:
> Howdy. I've been using the ProcessorTopologyTestDriver for testing my Kafka
> Streams topologies, and it's worked great. But now I'd like to test the
> punctuation functionality in my topology, and I don't see anything in
> ProcessorTopologyTestDriver that allows for that. The KStreamTestDriver has
> a punctuate() method available, but it looks to be for the older
> (pre-1.0.0) punctuation functionality. So I modified the
> ProcessorTopologyTestDriver to also call:
> 
> task.maybePunctuateStreamTime();
> task.maybePunctuateSystemTime();
> 
> within ProcessorTopologyTestDriver's process() method, right after
> task.commit();
> 
> Is there a better way to test punctuation functionality?
> 



signature.asc
Description: OpenPGP digital signature


Re: Using Custom Partitioner in Streams

2017-12-15 Thread Matthias J. Sax
It's not recommended to write a custom partitioner because it's pretty
difficult to write a correct one. There are many dependencies and you
need deep knowledge of Kafka Streams internals to get it write.
Otherwise, your custom partitioner breaks Kafka Streams.

That is the reason why it's not documented...

Not sure so, what you try to achieve in the first place. What do you
mean by

> I want to make sure that during map phase, the keys
>> produced adhere to the customized partitioner.

Maybe you achieve what you want differently.


-Matthias

On 12/15/17 1:19 AM, Sameer Kumar wrote:
> Hi,
> 
> I want to use the custom partitioner in streams, I couldnt find the same in
> the documentation. I want to make sure that during map phase, the keys
> produced adhere to the customized partitioner.
> 
> -Sameer.
> 



signature.asc
Description: OpenPGP digital signature


How to get the start and end of partition from kafka

2017-12-15 Thread ??????
Hello, I want to get the start and end of partition form kafka by API, such 
as??AdminClient. How can I do?

Testing Punctuation in Kafka Streams

2017-12-15 Thread Tom Wessels
Howdy. I've been using the ProcessorTopologyTestDriver for testing my Kafka
Streams topologies, and it's worked great. But now I'd like to test the
punctuation functionality in my topology, and I don't see anything in
ProcessorTopologyTestDriver that allows for that. The KStreamTestDriver has
a punctuate() method available, but it looks to be for the older
(pre-1.0.0) punctuation functionality. So I modified the
ProcessorTopologyTestDriver to also call:

task.maybePunctuateStreamTime();
task.maybePunctuateSystemTime();

within ProcessorTopologyTestDriver's process() method, right after
task.commit();

Is there a better way to test punctuation functionality?


Re: Kafka isync replica

2017-12-15 Thread Tarun Garg
I am using Kafka 0.11.0.1 and Kafka seems to be recovered by itself.
I will post my finding.


> On Dec 14, 2017, at 2:18 PM, Ted Yu  wrote:
> 
> Can you look at the log from controller to see if there is some clue
> w.r.t. partition
> 82 ?
> Was unclean leader election enabled ?
> 
> BTW which release of Kafka are you using ?
> 
> Cheers
> 
> On Thu, Dec 14, 2017 at 11:49 AM, Tarun Garg  wrote:
> 
>> I checked log.dir of the all nodes and found index, log and time index
>> files are in sync(size and date of modification).
>> This caused more confusion.
>> 
>> How can I add this isr back.
>> 
>> 
>> 
>>> On Dec 14, 2017, at 1:35 AM, UMESH CHAUDHARY 
>> wrote:
>>> 
>>> Do you find any messages on broker 3 w.r.t. Topic: XYZ Partition 82? Also
>>> broker 3 is in isr for other partitions (at least 83,84) so I don't see
>> any
>>> broker issue in this.
>>> 
>>> On Thu, 14 Dec 2017 at 01:23 Tarun Garg  wrote:
>>> 
 Hi,
 
 I have a Kafka cluster and it is running from a long time, just today I
 realize that some of the topic partition are not in good state, there
>> Isr
 is been reduced to 2 from 3 even though the node of the lost replica is
 working
 
 Like
 Topic: XYZ Partition 82 Leader: 1 Replicas:1,2,3 Isr:1,2 <- problem
 Topic: XYZ Partition 83 Leader: 2 Replicas:2,3,4 Isr:3,2,4
 Topic: XYZ Partition 84 Leader: 3 Replicas:3,4,5 Isr:3,4,5
 
 How can I fix bring replica 3 in isr for partition 82.
 
 Any help is appreciated.
 
 Thanks
 Tarun
>> 
>> 



Kafka 1.0 ack=all issue

2017-12-15 Thread Rob Verkuylen
Hi,

After upgrading to 1.0 we're getting strange producer/broker behaviour not
experienced on <1.0.

As a test we run a single threaded producer just sending "TEST" against our
cluster with the following producer settings, on a topic with replica's=3
and min.isr=2:
linger.ms=10
acks=all
retries=1000
batch=16k
retry.backoff.ms=1000

Using the callback on send we immediately see a huge lag in the amount of
acks coming back(600k+), while on 0.11 this hovers around 4k-20k max). At
the same time we see a drop in the producer sending msg/s, in about
90seconds this drops to 0. After 10minutes of silence all we see a list of
network exceptions like these on all partitions: "Got error produce
response with correlation id X on topic-partition test-topic, retrying (999
attempts left). Error: NETWORK_EXCEPTION" Then short continuation on sends
but quickly the same behaviour.

Now for the kicker: Staring another thread after the first experiences
this, producing on the same topic, same groupid, will 'release' the first
thread and all acks are returned as normal and behaviour returns to normal.
No issues are experienced when acks=1. Kafka logs show no issues at default
log levels, havent had the opportunity to test further of with more fine
grained log levels. The brokers run default settings with maybe the special
that inter broker protocol is 1.0, but client protocol is still set to
0.9.0. Testing done above is with client ranging from 0.9 upto 1.0, all
showing the same behaviour.

Downgrading the entire cluster back to 0.11.0.2 same settings, same
clients, same tests and all is well. Could this be a bug?

Thanks,
  Rob


Re: Topic segments being deleted unexpectedly

2017-12-15 Thread Wim Van Leuven
OK. tnx!

On Fri, 15 Dec 2017 at 15:08 Damian Guy  wrote:

> I believe that just controls when the segment gets deleted from disk. It is
> removed from memory before that. So i don't believe that will help.
>
> On Fri, 15 Dec 2017 at 13:54 Wim Van Leuven <
> wim.vanleu...@highestpoint.biz>
> wrote:
>
> > So, in our setup, to provide the historic data on the platform, we would
> > have to define all topics with a retention period of the business time we
> > want to keep the data. However, on the intermediate topics, we would only
> > require the data to be there as long as necessary to be able to process
> the
> > data.
> >
> > Could we achieve this result by increasing the
> log.segment.delete.delay.ms
> > to e.g. 1d? Would this give us a timeframe of a day to process the data
> on
> > the intermediary topics? Or is this just wishful thinking?
> >
> > Thanks again!
> > -wim
> >
> > On Fri, 15 Dec 2017 at 14:23 Wim Van Leuven <
> > wim.vanleu...@highestpoint.biz>
> > wrote:
> >
> > > Is it really? I checked some records on kafka topics using commandline
> > > consumers to print key and timestamps and timestamps was logged as
> > > CreateTime:1513332523181
> > >
> > > But that would explain the issue. I'll adjust the retention on the
> topic
> > > and rerun.
> > >
> > > Thank you already for the insights!
> > > -wim
> > >
> > > On Fri, 15 Dec 2017 at 14:08 Damian Guy  wrote:
> > >
> > >> Hi,
> > >>
> > >> It is likely due to the timestamps you are extracting and using as the
> > >> record timestamp. Kafka uses the record timestamps for retention. I
> > >> suspect
> > >> this is causing your segments to roll and be deleted.
> > >>
> > >> Thanks,
> > >> Damian
> > >>
> > >> On Fri, 15 Dec 2017 at 11:49 Wim Van Leuven <
> > >> wim.vanleu...@highestpoint.biz>
> > >> wrote:
> > >>
> > >> > Hello all,
> > >> >
> > >> > We are running some Kafka Streams processing apps over Confluent OS
> > >> > (v3.2.0) and I'm seeing unexpected but 'consitent' behaviour
> regarding
> > >> > segment and index deletion.
> > >> >
> > >> > So, we have a topic 'input' that contains about 30M records to
> > ingest. A
> > >> > 1st processor transforms and pipes the data onto a second,
> > intermediate
> > >> > topic. A 2nd processor picks up the records, treats them and sends
> > them
> > >> > out.
> > >> >
> > >> > On our test environment the intermediate topic was set up with a
> > >> retention
> > >> > of 1 hour because we don't need to keep the data, only while
> > processing.
> > >> >
> > >> > On a test run we saw the 2nd processor exit with exceptions that it
> > >> > couldn't read offsets. We do not automatically reset because it
> should
> > >> not
> > >> > happen.
> > >> >
> > >> > org.apache.kafka.streams.errors.StreamsException: No valid committed
> > >> offset
> > >> > found for input topic cdr-raw-arch (partition 1) and no valid reset
> > >> policy
> > >> > configured. You need to set configuration parameter
> > "auto.offset.reset"
> > >> or
> > >> > specify a topic specific reset policy via
> > >> > KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset,
> ...)
> > or
> > >> > KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)
> > >> >
> > >> > As we thought that it's the topic data expiring (processing takes
> > longer
> > >> > than 1 hour) we changed the topic to retain the data for 1 day.
> > >> >
> > >> > On rerun, we however saw exactly the same behaviour. That's why I'm
> > >> saying
> > >> > 'consistent behaviour' above.
> > >> >
> > >> > In the server logs, we see that kafka is rolling segments but
> > >> immediately
> > >> > scheduling them for deletion.
> > >> >
> > >> > [2017-12-15 11:01:46,992] INFO Rolled new log segment for
> > >> > 'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
> > >> > [2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for
> log
> > >> > cdr-raw-arch-1 for deletion. (kafka.log.Log)
> > >> > [2017-12-15 11:01:46,995] INFO Rolled new log segment for
> > >> > 'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
> > >> > [2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for
> log
> > >> > cdr-raw-arch-0 for deletion. (kafka.log.Log)
> > >> > [2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
> > >> > cdr-raw-arch-1. (kafka.log.Log)
> > >> > [2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
> > >> > cdr-raw-arch-0. (kafka.log.Log)
> > >> > [2017-12-15 11:02:47,170] INFO Deleting index
> > >> > /data/4/kafka/cdr-raw-arch-1/07330185.index.deleted
> > >> > (kafka.log.OffsetIndex)
> > >> > [2017-12-15 11:02:47,171] INFO Deleting index
> > >> > /data/4/kafka/cdr-raw-arch-1/07330185.timeindex.deleted
> > >> > (kafka.log.TimeIndex)
> > >> > [2017-12-15 11:02:47,172] INFO Deleting index
> > >> > /data/3/kafka/cdr-raw-arch-0/07335872.index.deleted
> > >> > (kafka.log.OffsetIndex)
> > >> > [2017-12-15 11:02:47,173] INFO Deleting index
> > >> > 

Re: Topic segments being deleted unexpectedly

2017-12-15 Thread Damian Guy
I believe that just controls when the segment gets deleted from disk. It is
removed from memory before that. So i don't believe that will help.

On Fri, 15 Dec 2017 at 13:54 Wim Van Leuven 
wrote:

> So, in our setup, to provide the historic data on the platform, we would
> have to define all topics with a retention period of the business time we
> want to keep the data. However, on the intermediate topics, we would only
> require the data to be there as long as necessary to be able to process the
> data.
>
> Could we achieve this result by increasing the log.segment.delete.delay.ms
> to e.g. 1d? Would this give us a timeframe of a day to process the data on
> the intermediary topics? Or is this just wishful thinking?
>
> Thanks again!
> -wim
>
> On Fri, 15 Dec 2017 at 14:23 Wim Van Leuven <
> wim.vanleu...@highestpoint.biz>
> wrote:
>
> > Is it really? I checked some records on kafka topics using commandline
> > consumers to print key and timestamps and timestamps was logged as
> > CreateTime:1513332523181
> >
> > But that would explain the issue. I'll adjust the retention on the topic
> > and rerun.
> >
> > Thank you already for the insights!
> > -wim
> >
> > On Fri, 15 Dec 2017 at 14:08 Damian Guy  wrote:
> >
> >> Hi,
> >>
> >> It is likely due to the timestamps you are extracting and using as the
> >> record timestamp. Kafka uses the record timestamps for retention. I
> >> suspect
> >> this is causing your segments to roll and be deleted.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 15 Dec 2017 at 11:49 Wim Van Leuven <
> >> wim.vanleu...@highestpoint.biz>
> >> wrote:
> >>
> >> > Hello all,
> >> >
> >> > We are running some Kafka Streams processing apps over Confluent OS
> >> > (v3.2.0) and I'm seeing unexpected but 'consitent' behaviour regarding
> >> > segment and index deletion.
> >> >
> >> > So, we have a topic 'input' that contains about 30M records to
> ingest. A
> >> > 1st processor transforms and pipes the data onto a second,
> intermediate
> >> > topic. A 2nd processor picks up the records, treats them and sends
> them
> >> > out.
> >> >
> >> > On our test environment the intermediate topic was set up with a
> >> retention
> >> > of 1 hour because we don't need to keep the data, only while
> processing.
> >> >
> >> > On a test run we saw the 2nd processor exit with exceptions that it
> >> > couldn't read offsets. We do not automatically reset because it should
> >> not
> >> > happen.
> >> >
> >> > org.apache.kafka.streams.errors.StreamsException: No valid committed
> >> offset
> >> > found for input topic cdr-raw-arch (partition 1) and no valid reset
> >> policy
> >> > configured. You need to set configuration parameter
> "auto.offset.reset"
> >> or
> >> > specify a topic specific reset policy via
> >> > KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...)
> or
> >> > KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)
> >> >
> >> > As we thought that it's the topic data expiring (processing takes
> longer
> >> > than 1 hour) we changed the topic to retain the data for 1 day.
> >> >
> >> > On rerun, we however saw exactly the same behaviour. That's why I'm
> >> saying
> >> > 'consistent behaviour' above.
> >> >
> >> > In the server logs, we see that kafka is rolling segments but
> >> immediately
> >> > scheduling them for deletion.
> >> >
> >> > [2017-12-15 11:01:46,992] INFO Rolled new log segment for
> >> > 'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
> >> > [2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for log
> >> > cdr-raw-arch-1 for deletion. (kafka.log.Log)
> >> > [2017-12-15 11:01:46,995] INFO Rolled new log segment for
> >> > 'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
> >> > [2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for log
> >> > cdr-raw-arch-0 for deletion. (kafka.log.Log)
> >> > [2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
> >> > cdr-raw-arch-1. (kafka.log.Log)
> >> > [2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
> >> > cdr-raw-arch-0. (kafka.log.Log)
> >> > [2017-12-15 11:02:47,170] INFO Deleting index
> >> > /data/4/kafka/cdr-raw-arch-1/07330185.index.deleted
> >> > (kafka.log.OffsetIndex)
> >> > [2017-12-15 11:02:47,171] INFO Deleting index
> >> > /data/4/kafka/cdr-raw-arch-1/07330185.timeindex.deleted
> >> > (kafka.log.TimeIndex)
> >> > [2017-12-15 11:02:47,172] INFO Deleting index
> >> > /data/3/kafka/cdr-raw-arch-0/07335872.index.deleted
> >> > (kafka.log.OffsetIndex)
> >> > [2017-12-15 11:02:47,173] INFO Deleting index
> >> > /data/3/kafka/cdr-raw-arch-0/07335872.timeindex.deleted
> >> > (kafka.log.TimeIndex)
> >> >
> >> >
> >> > However, I do not understand the behaviour: Why is kafka deleting the
> >> data
> >> > on the intermediary topic before it got processed? Almost immediately
> >> even?
> >> >
> >> > We do use timestamp extractors to pull business time from 

Re: Topic segments being deleted unexpectedly

2017-12-15 Thread Wim Van Leuven
So, in our setup, to provide the historic data on the platform, we would
have to define all topics with a retention period of the business time we
want to keep the data. However, on the intermediate topics, we would only
require the data to be there as long as necessary to be able to process the
data.

Could we achieve this result by increasing the log.segment.delete.delay.ms
to e.g. 1d? Would this give us a timeframe of a day to process the data on
the intermediary topics? Or is this just wishful thinking?

Thanks again!
-wim

On Fri, 15 Dec 2017 at 14:23 Wim Van Leuven 
wrote:

> Is it really? I checked some records on kafka topics using commandline
> consumers to print key and timestamps and timestamps was logged as
> CreateTime:1513332523181
>
> But that would explain the issue. I'll adjust the retention on the topic
> and rerun.
>
> Thank you already for the insights!
> -wim
>
> On Fri, 15 Dec 2017 at 14:08 Damian Guy  wrote:
>
>> Hi,
>>
>> It is likely due to the timestamps you are extracting and using as the
>> record timestamp. Kafka uses the record timestamps for retention. I
>> suspect
>> this is causing your segments to roll and be deleted.
>>
>> Thanks,
>> Damian
>>
>> On Fri, 15 Dec 2017 at 11:49 Wim Van Leuven <
>> wim.vanleu...@highestpoint.biz>
>> wrote:
>>
>> > Hello all,
>> >
>> > We are running some Kafka Streams processing apps over Confluent OS
>> > (v3.2.0) and I'm seeing unexpected but 'consitent' behaviour regarding
>> > segment and index deletion.
>> >
>> > So, we have a topic 'input' that contains about 30M records to ingest. A
>> > 1st processor transforms and pipes the data onto a second, intermediate
>> > topic. A 2nd processor picks up the records, treats them and sends them
>> > out.
>> >
>> > On our test environment the intermediate topic was set up with a
>> retention
>> > of 1 hour because we don't need to keep the data, only while processing.
>> >
>> > On a test run we saw the 2nd processor exit with exceptions that it
>> > couldn't read offsets. We do not automatically reset because it should
>> not
>> > happen.
>> >
>> > org.apache.kafka.streams.errors.StreamsException: No valid committed
>> offset
>> > found for input topic cdr-raw-arch (partition 1) and no valid reset
>> policy
>> > configured. You need to set configuration parameter "auto.offset.reset"
>> or
>> > specify a topic specific reset policy via
>> > KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or
>> > KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)
>> >
>> > As we thought that it's the topic data expiring (processing takes longer
>> > than 1 hour) we changed the topic to retain the data for 1 day.
>> >
>> > On rerun, we however saw exactly the same behaviour. That's why I'm
>> saying
>> > 'consistent behaviour' above.
>> >
>> > In the server logs, we see that kafka is rolling segments but
>> immediately
>> > scheduling them for deletion.
>> >
>> > [2017-12-15 11:01:46,992] INFO Rolled new log segment for
>> > 'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
>> > [2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for log
>> > cdr-raw-arch-1 for deletion. (kafka.log.Log)
>> > [2017-12-15 11:01:46,995] INFO Rolled new log segment for
>> > 'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
>> > [2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for log
>> > cdr-raw-arch-0 for deletion. (kafka.log.Log)
>> > [2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
>> > cdr-raw-arch-1. (kafka.log.Log)
>> > [2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
>> > cdr-raw-arch-0. (kafka.log.Log)
>> > [2017-12-15 11:02:47,170] INFO Deleting index
>> > /data/4/kafka/cdr-raw-arch-1/07330185.index.deleted
>> > (kafka.log.OffsetIndex)
>> > [2017-12-15 11:02:47,171] INFO Deleting index
>> > /data/4/kafka/cdr-raw-arch-1/07330185.timeindex.deleted
>> > (kafka.log.TimeIndex)
>> > [2017-12-15 11:02:47,172] INFO Deleting index
>> > /data/3/kafka/cdr-raw-arch-0/07335872.index.deleted
>> > (kafka.log.OffsetIndex)
>> > [2017-12-15 11:02:47,173] INFO Deleting index
>> > /data/3/kafka/cdr-raw-arch-0/07335872.timeindex.deleted
>> > (kafka.log.TimeIndex)
>> >
>> >
>> > However, I do not understand the behaviour: Why is kafka deleting the
>> data
>> > on the intermediary topic before it got processed? Almost immediately
>> even?
>> >
>> > We do use timestamp extractors to pull business time from the records.
>> Is
>> > that taken into account for retention time? Or is retention only based
>> on
>> > times of the files on disk?
>> >
>> > Thank you to shed any light on this problem!
>> >
>> > Kind regards!
>> > -wim
>> >
>>
>


Re: Topic segments being deleted unexpectedly

2017-12-15 Thread Wim Van Leuven
Is it really? I checked some records on kafka topics using commandline
consumers to print key and timestamps and timestamps was logged as
CreateTime:1513332523181

But that would explain the issue. I'll adjust the retention on the topic
and rerun.

Thank you already for the insights!
-wim

On Fri, 15 Dec 2017 at 14:08 Damian Guy  wrote:

> Hi,
>
> It is likely due to the timestamps you are extracting and using as the
> record timestamp. Kafka uses the record timestamps for retention. I suspect
> this is causing your segments to roll and be deleted.
>
> Thanks,
> Damian
>
> On Fri, 15 Dec 2017 at 11:49 Wim Van Leuven <
> wim.vanleu...@highestpoint.biz>
> wrote:
>
> > Hello all,
> >
> > We are running some Kafka Streams processing apps over Confluent OS
> > (v3.2.0) and I'm seeing unexpected but 'consitent' behaviour regarding
> > segment and index deletion.
> >
> > So, we have a topic 'input' that contains about 30M records to ingest. A
> > 1st processor transforms and pipes the data onto a second, intermediate
> > topic. A 2nd processor picks up the records, treats them and sends them
> > out.
> >
> > On our test environment the intermediate topic was set up with a
> retention
> > of 1 hour because we don't need to keep the data, only while processing.
> >
> > On a test run we saw the 2nd processor exit with exceptions that it
> > couldn't read offsets. We do not automatically reset because it should
> not
> > happen.
> >
> > org.apache.kafka.streams.errors.StreamsException: No valid committed
> offset
> > found for input topic cdr-raw-arch (partition 1) and no valid reset
> policy
> > configured. You need to set configuration parameter "auto.offset.reset"
> or
> > specify a topic specific reset policy via
> > KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or
> > KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)
> >
> > As we thought that it's the topic data expiring (processing takes longer
> > than 1 hour) we changed the topic to retain the data for 1 day.
> >
> > On rerun, we however saw exactly the same behaviour. That's why I'm
> saying
> > 'consistent behaviour' above.
> >
> > In the server logs, we see that kafka is rolling segments but immediately
> > scheduling them for deletion.
> >
> > [2017-12-15 11:01:46,992] INFO Rolled new log segment for
> > 'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
> > [2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for log
> > cdr-raw-arch-1 for deletion. (kafka.log.Log)
> > [2017-12-15 11:01:46,995] INFO Rolled new log segment for
> > 'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
> > [2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for log
> > cdr-raw-arch-0 for deletion. (kafka.log.Log)
> > [2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
> > cdr-raw-arch-1. (kafka.log.Log)
> > [2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
> > cdr-raw-arch-0. (kafka.log.Log)
> > [2017-12-15 11:02:47,170] INFO Deleting index
> > /data/4/kafka/cdr-raw-arch-1/07330185.index.deleted
> > (kafka.log.OffsetIndex)
> > [2017-12-15 11:02:47,171] INFO Deleting index
> > /data/4/kafka/cdr-raw-arch-1/07330185.timeindex.deleted
> > (kafka.log.TimeIndex)
> > [2017-12-15 11:02:47,172] INFO Deleting index
> > /data/3/kafka/cdr-raw-arch-0/07335872.index.deleted
> > (kafka.log.OffsetIndex)
> > [2017-12-15 11:02:47,173] INFO Deleting index
> > /data/3/kafka/cdr-raw-arch-0/07335872.timeindex.deleted
> > (kafka.log.TimeIndex)
> >
> >
> > However, I do not understand the behaviour: Why is kafka deleting the
> data
> > on the intermediary topic before it got processed? Almost immediately
> even?
> >
> > We do use timestamp extractors to pull business time from the records. Is
> > that taken into account for retention time? Or is retention only based on
> > times of the files on disk?
> >
> > Thank you to shed any light on this problem!
> >
> > Kind regards!
> > -wim
> >
>


Re: Topic segments being deleted unexpectedly

2017-12-15 Thread Damian Guy
Hi,

It is likely due to the timestamps you are extracting and using as the
record timestamp. Kafka uses the record timestamps for retention. I suspect
this is causing your segments to roll and be deleted.

Thanks,
Damian

On Fri, 15 Dec 2017 at 11:49 Wim Van Leuven 
wrote:

> Hello all,
>
> We are running some Kafka Streams processing apps over Confluent OS
> (v3.2.0) and I'm seeing unexpected but 'consitent' behaviour regarding
> segment and index deletion.
>
> So, we have a topic 'input' that contains about 30M records to ingest. A
> 1st processor transforms and pipes the data onto a second, intermediate
> topic. A 2nd processor picks up the records, treats them and sends them
> out.
>
> On our test environment the intermediate topic was set up with a retention
> of 1 hour because we don't need to keep the data, only while processing.
>
> On a test run we saw the 2nd processor exit with exceptions that it
> couldn't read offsets. We do not automatically reset because it should not
> happen.
>
> org.apache.kafka.streams.errors.StreamsException: No valid committed offset
> found for input topic cdr-raw-arch (partition 1) and no valid reset policy
> configured. You need to set configuration parameter "auto.offset.reset" or
> specify a topic specific reset policy via
> KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or
> KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)
>
> As we thought that it's the topic data expiring (processing takes longer
> than 1 hour) we changed the topic to retain the data for 1 day.
>
> On rerun, we however saw exactly the same behaviour. That's why I'm saying
> 'consistent behaviour' above.
>
> In the server logs, we see that kafka is rolling segments but immediately
> scheduling them for deletion.
>
> [2017-12-15 11:01:46,992] INFO Rolled new log segment for
> 'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
> [2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for log
> cdr-raw-arch-1 for deletion. (kafka.log.Log)
> [2017-12-15 11:01:46,995] INFO Rolled new log segment for
> 'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
> [2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for log
> cdr-raw-arch-0 for deletion. (kafka.log.Log)
> [2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
> cdr-raw-arch-1. (kafka.log.Log)
> [2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
> cdr-raw-arch-0. (kafka.log.Log)
> [2017-12-15 11:02:47,170] INFO Deleting index
> /data/4/kafka/cdr-raw-arch-1/07330185.index.deleted
> (kafka.log.OffsetIndex)
> [2017-12-15 11:02:47,171] INFO Deleting index
> /data/4/kafka/cdr-raw-arch-1/07330185.timeindex.deleted
> (kafka.log.TimeIndex)
> [2017-12-15 11:02:47,172] INFO Deleting index
> /data/3/kafka/cdr-raw-arch-0/07335872.index.deleted
> (kafka.log.OffsetIndex)
> [2017-12-15 11:02:47,173] INFO Deleting index
> /data/3/kafka/cdr-raw-arch-0/07335872.timeindex.deleted
> (kafka.log.TimeIndex)
>
>
> However, I do not understand the behaviour: Why is kafka deleting the data
> on the intermediary topic before it got processed? Almost immediately even?
>
> We do use timestamp extractors to pull business time from the records. Is
> that taken into account for retention time? Or is retention only based on
> times of the files on disk?
>
> Thank you to shed any light on this problem!
>
> Kind regards!
> -wim
>


Topic segments being deleted unexpectedly

2017-12-15 Thread Wim Van Leuven
Hello all,

We are running some Kafka Streams processing apps over Confluent OS
(v3.2.0) and I'm seeing unexpected but 'consitent' behaviour regarding
segment and index deletion.

So, we have a topic 'input' that contains about 30M records to ingest. A
1st processor transforms and pipes the data onto a second, intermediate
topic. A 2nd processor picks up the records, treats them and sends them
out.

On our test environment the intermediate topic was set up with a retention
of 1 hour because we don't need to keep the data, only while processing.

On a test run we saw the 2nd processor exit with exceptions that it
couldn't read offsets. We do not automatically reset because it should not
happen.

org.apache.kafka.streams.errors.StreamsException: No valid committed offset
found for input topic cdr-raw-arch (partition 1) and no valid reset policy
configured. You need to set configuration parameter "auto.offset.reset" or
specify a topic specific reset policy via
KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or
KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)

As we thought that it's the topic data expiring (processing takes longer
than 1 hour) we changed the topic to retain the data for 1 day.

On rerun, we however saw exactly the same behaviour. That's why I'm saying
'consistent behaviour' above.

In the server logs, we see that kafka is rolling segments but immediately
scheduling them for deletion.

[2017-12-15 11:01:46,992] INFO Rolled new log segment for
'cdr-raw-arch-1' in 1 ms. (kafka.log.Log)
[2017-12-15 11:01:46,993] INFO Scheduling log segment 7330185 for log
cdr-raw-arch-1 for deletion. (kafka.log.Log)
[2017-12-15 11:01:46,995] INFO Rolled new log segment for
'cdr-raw-arch-0' in 2 ms. (kafka.log.Log)
[2017-12-15 11:01:46,995] INFO Scheduling log segment 7335872 for log
cdr-raw-arch-0 for deletion. (kafka.log.Log)
[2017-12-15 11:02:46,995] INFO Deleting segment 7330185 from log
cdr-raw-arch-1. (kafka.log.Log)
[2017-12-15 11:02:46,996] INFO Deleting segment 7335872 from log
cdr-raw-arch-0. (kafka.log.Log)
[2017-12-15 11:02:47,170] INFO Deleting index
/data/4/kafka/cdr-raw-arch-1/07330185.index.deleted
(kafka.log.OffsetIndex)
[2017-12-15 11:02:47,171] INFO Deleting index
/data/4/kafka/cdr-raw-arch-1/07330185.timeindex.deleted
(kafka.log.TimeIndex)
[2017-12-15 11:02:47,172] INFO Deleting index
/data/3/kafka/cdr-raw-arch-0/07335872.index.deleted
(kafka.log.OffsetIndex)
[2017-12-15 11:02:47,173] INFO Deleting index
/data/3/kafka/cdr-raw-arch-0/07335872.timeindex.deleted
(kafka.log.TimeIndex)


However, I do not understand the behaviour: Why is kafka deleting the data
on the intermediary topic before it got processed? Almost immediately even?

We do use timestamp extractors to pull business time from the records. Is
that taken into account for retention time? Or is retention only based on
times of the files on disk?

Thank you to shed any light on this problem!

Kind regards!
-wim


Re: Queries on Kafka Capacity

2017-12-15 Thread Brett Rann
You would add new brokers to the cluster, and then do a partition
reassignment to move some partitions to the new broker.

In the simplest example:

Say you have 1 topic with 3 partitions.

partition 0: brokers: 1,2
partition 1: brokers: 2,3
partition 2: brokers: 3,1

If you added 3 more brokers, you could do a reassignment so it is:

partition 0: brokers: 1,2
partition 1: brokers: 3,4
partition 2: brokers: 5,6

Which could be done with the following partition reassignment json:

{"partitions":[
 {"topic": "test", "partition": 0, "replicas": [1,2] },
 {"topic": "test", "partition": 1, "replicas": [3,4] },
 {"topic": "test", "partition": 2, "replicas": [5,6] }
  ], "version":1
}


/usr/local/bin/kafka-reassign-partitions --execute --reassignment-json-file
reassignment.json --throttle 8000
and remember to verify:
/usr/local/bin/kafka-reassign-partitions --verify --reassignment-json-file
reassignment.json

In reality you'd have multiple topics and probably many partitions, but the
principle is the same.

Documentation here:
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-4.ReassignPartitionsTool


On Fri, Dec 15, 2017 at 12:12 PM, ajay chaudhary <
juniora...@yahoo.com.invalid> wrote:

>
> Adding Users list to this email for help on below queries. please help us.
> Regards,Ajay chaudhary
>
> On Thursday 14 December 2017, 1:07:56 PM IST, ajay chaudhary <
> juniora...@yahoo.com> wrote:
>
>  Hi Team,
> This is Ajay working with Yodlee India. We are trying to setup Kafka
> cluster for streaming and we need some clarifications.
> Could you please help us to understand this behavior.
> How do we add a capacity to existing Kafka cluster. Let us assume, we have
> Kafka cluster with 3 brokers and all brokers has single mount point
> allocated(log.dirs). we create a topic with some partitions and after some
> days mount point becomes 100% full may be due to unexpected data growth.
> how do we add space to Kafka cluster so that existing partitions (say
> partition 0 where it originally resides is full) make use of newly
> provisioned space?
> how would new messages be handled coming to this partition? Please help us
> understand this?
> Regards,Ajay chaudhary
>


Using Custom Partitioner in Streams

2017-12-15 Thread Sameer Kumar
Hi,

I want to use the custom partitioner in streams, I couldnt find the same in
the documentation. I want to make sure that during map phase, the keys
produced adhere to the customized partitioner.

-Sameer.


Re: kafka 1.0/0.11.0.1 log message upgrade: Error processing fetch operation on partition __consumer_offsets-21 offset 200349244

2017-12-15 Thread Brett Rann
Another interesting datapoint:

Taking a deeper look at partition 21:

brann@kafka1:/data/kafka/logs/__consumer_offsets-21$ ls -la
total 20176
drwxr-xr-x2 kafka kafka 4096 Dec 15 08:11 .
drwxr-xr-x 1605 kafka kafka   167936 Dec 15 08:31 ..
-rw-r--r--1 kafka kafka0 Dec 15 08:03 .index
-rw-r--r--1 kafka kafka20924 Dec 15 04:08 .log
-rw-r--r--1 kafka kafka   12 Dec 15 08:03 .timeindex
-rw-r--r--1 kafka kafka0 Dec 15 08:03 000200248390.index
-rw-r--r--1 kafka kafka 3662 Dec 15 07:53 000200248390.log
-rw-r--r--1 kafka kafka   10 Dec 15 04:08 000200248390.snapshot
-rw-r--r--1 kafka kafka   12 Dec 15 08:03 000200248390.timeindex
-rw-r--r--1 kafka kafka   10 Dec 15 04:36 000200349263.snapshot
-rw-r--r--1 kafka kafka   10 Dec 15 06:01 000200650903.snapshot
-rw-r--r--1 kafka kafka 10485760 Dec 15 08:31 000201167309.index
-rw-r--r--1 kafka kafka 15874568 Dec 15 08:31 000201167309.log
-rw-r--r--1 kafka kafka   10 Dec 15 07:53 000201167309.snapshot
-rw-r--r--1 kafka kafka 10485756 Dec 15 08:31 000201167309.timeindex
-rw-r--r--1 kafka kafka  164 Dec 15 08:11 leader-epoch-checkpoint

*Why are there extra .snapsho files in there?*

these two are before and after the offset error that is mentioned

-rw-r--r--1 kafka kafka   10 Dec 15 04:36 000200349263.snapshot
-rw-r--r--1 kafka kafka   10 Dec 15 06:01 000200650903.snapshot


Taking a deeper look into the log file:

Taking a deeper look into the log file...

__consumer_offsets-21$ /data/kafka/kafka/bin/kafka-run-class.sh
kafka.tools.DumpLogSegments   --deep-iteration --files
000200248390.log | less

Dumping 000200248390.log
Starting offset: 200248390
offset: 201167253 position: 0 CreateTime: 1513324424113 isvalid: true
keysize: 75 valuesize: 28 magic: 2 compresscodec: NONE producerId: -1
producerEpoch: -1 sequence: 1 isTransactional: false headerKeys: []
offset: 201167258 position: 0 CreateTime: 1513324424113 isvalid: true
keysize: 75 valuesize: 28 magic: 2 compresscodec: NONE producerId: -1
producerEpoch: -1 sequence: 6 isTransactional: false headerKeys: []
...

Interestingly, for partition 21 the error is about offset 200349244. I'd
expect that to be in 000200248390.log, but when dumping that log
file manually it starts with offset 201167266.


Still stumped on:
why is it erroring about 201167266
why cant' I find 201167266 in the log file for that partition (I've grepped
the output of DumpLogSegments for all 3 log files. it's not there.)
Why is it trying 201167266? Is it from the snapshot files? Is there some
surgery we can do to make it stop. Safely? :)

On Fri, Dec 15, 2017 at 4:33 PM, Brett Rann  wrote:

> on `kafka_2.11-1.0.1-d04daf570` we are upgrading the log format from
> 0.9.0.1 to 0.11.0.1 and after the upgrade have set
>
> inter.broker.protocol.version=1.0
> log.message.format.version=0.11.0.1
>
> We have applied this upgrade to 5 clusters by upgrading broker 1, leaving
> it for a day, then coming back when happy to upgrade the remaining brokers.
>
>
> 4 of those upgrades went without issue.
>
>
> However in one, when we upgraded the remaining brokers, we now start
> seeing these errors on broker1:
>
> Error processing fetch operation on partition __consumer_offsets-21  offset 
> 200349244
>
> For 4 consumer offset partitions, all which happen to be led by 1.
>
> kafka-request-handler-3 72 ERROR kafka.server.ReplicaManager 
> 2017-12-15T07:39:40.380+ [ReplicaManager broker=1] Error processing fetch 
> operation on partition __consumer_offsets-21  offset 200349244
> kafka-request-handler-3 72 ERROR kafka.server.ReplicaManager 
> 2017-12-15T07:39:40.381+ [ReplicaManager broker=1] Error processing fetch 
> operation on partition __consumer_offsets-11  offset 188709568
> kafka-request-handler-3 72 ERROR kafka.server.ReplicaManager 
> 2017-12-15T07:39:40.381+ [ReplicaManager broker=1] Error processing fetch 
> operation on partition __consumer_offsets-1  offset 2045483676
> kafka-request-handler-5 74 ERROR kafka.server.ReplicaManager 
> 2017-12-15T07:39:41.672+ [ReplicaManager broker=1] Error processing fetch 
> operation on partition __consumer_offsets-31  offset 235294887
>
> Every second or so.
>
> If we stop that broker, those errors simply shift to the next leader for
> those 4 partitions. And moving the partitions to completely new brokers
> just moves the errors.
>
> We only see this on kafka1. not the other 9 brokers which had the log
> message fromat upgraded a day or two later.
>
> Any suggestion on how to proceed? I'm not even sure yet if this is
> isolated to the cluster, or if it's related to a consumer misbehaving.
> Since our multiple clusters /should/ have the same set of
> producers/consumers working on 

kafka 1.0/0.11.0.1 log message upgrade: Error processing fetch operation on partition __consumer_offsets-21 offset 200349244

2017-12-15 Thread Brett Rann
on `kafka_2.11-1.0.1-d04daf570` we are upgrading the log format from
0.9.0.1 to 0.11.0.1 and after the upgrade have set

inter.broker.protocol.version=1.0
log.message.format.version=0.11.0.1

We have applied this upgrade to 5 clusters by upgrading broker 1, leaving
it for a day, then coming back when happy to upgrade the remaining brokers.


4 of those upgrades went without issue.


However in one, when we upgraded the remaining brokers, we now start seeing
these errors on broker1:

Error processing fetch operation on partition __consumer_offsets-21
offset 200349244

For 4 consumer offset partitions, all which happen to be led by 1.

kafka-request-handler-3 72 ERROR kafka.server.ReplicaManager
2017-12-15T07:39:40.380+ [ReplicaManager broker=1] Error
processing fetch operation on partition __consumer_offsets-21  offset
200349244
kafka-request-handler-3 72 ERROR kafka.server.ReplicaManager
2017-12-15T07:39:40.381+ [ReplicaManager broker=1] Error
processing fetch operation on partition __consumer_offsets-11  offset
188709568
kafka-request-handler-3 72 ERROR kafka.server.ReplicaManager
2017-12-15T07:39:40.381+ [ReplicaManager broker=1] Error
processing fetch operation on partition __consumer_offsets-1  offset
2045483676
kafka-request-handler-5 74 ERROR kafka.server.ReplicaManager
2017-12-15T07:39:41.672+ [ReplicaManager broker=1] Error
processing fetch operation on partition __consumer_offsets-31  offset
235294887

Every second or so.

If we stop that broker, those errors simply shift to the next leader for
those 4 partitions. And moving the partitions to completely new brokers
just moves the errors.

We only see this on kafka1. not the other 9 brokers which had the log
message fromat upgraded a day or two later.

Any suggestion on how to proceed? I'm not even sure yet if this is isolated
to the cluster, or if it's related to a consumer misbehaving.  Since our
multiple clusters /should/ have the same set of producers/consumers working
on them, I'm doubtful that it's a misbehaving client.