Re: Mirrormaker consumption slowness

2017-12-06 Thread Steve Miller
This kind of sounds to me like there’s packet loss somewhere and TCP is closing 
the window to try to limit congestion.  But from the snippets you posted, I 
didn’t see any sacks in the tcpdump output.  If there *are* sacks, that’d be a 
strong indicator of loss somewhere, whether it’s in the network or it’s in some 
host that’s being overwhelmed.

I didn’t have a chance to do the header math to see if TCP’s advertising a 
small window in the lossy case you posted.  But I figured I’d mention this just 
in case it’s useful.

-Steve

> On Dec 6, 2017, at 5:27 PM, tao xiao  wrote:
> 
> Mirror mare is placed to close to target and send/receive buffer size set
> to 10MB which is the result of bandwidth-delay product. OS level tcp buffer
> has also been increased to 16MB max
> 
>> On Wed, 6 Dec 2017 at 15:19 Jan Filipiak  wrote:
>> 
>> Hi,
>> 
>> two questions. Is your MirrorMaker collocated with the source or the
>> target?
>> what are the send and receive buffer sizes on the connections that do span
>> across WAN?
>> 
>> Hope we can get you some help.
>> 
>> Best jan
>> 
>> 
>> 
>>> On 06.12.2017 14:36, Xu, Zhaohui wrote:
>>> Any update on this issue?
>>> 
>>> We also run into similar situation recently. The mirrormaker is
>> leveraged to replicate messages between clusters in different dc. But
>> sometimes a portion of partitions are with high consumer lag and tcpdump
>> also shows similar packet delivery pattern. The behavior is sort of weird
>> and is not self-explaining. Wondering whether it has anything to do with
>> the fact that number of consumers is too large?  In our example, we have
>> around 100 consumer connections per broker.
>>> 
>>> Regards,
>>> Jeff
>>> 
>>> On 12/5/17, 10:14 AM, "tao xiao"  wrote:
>>> 
>>> Hi,
>>> 
>>> any pointer will be highly appreciated
>>> 
 On Thu, 30 Nov 2017 at 14:56 tao xiao  wrote:
 
 Hi There,
 
 
 
 We are running into a weird situation when using Mirrormaker to
>> replicate
 messages between Kafka clusters across datacenter and reach you
>> for help in
 case you also encountered this kind of problem before or have
>> some insights
 in this kind of issue.
 
 
 
 Here is the scenario. We have setup a deployment where we run 30
 Mirrormaker instances on 30 different nodes. Each Mirrormaker
>> instance is
 configure with num.streams=1 thus only one consumer runs. The
>> topics to
 replicate is configure with 100 partitions and data is almost
>> evenly
 distributed across all partitions. After running a period of
>> time, weird
 things happened that some of the Mirrormaker instances seems to
>> slow down
 and consume at a relative slow speed from source Kafka cluster.
>> The output
 of tcptrack shows the consume rate of problematic instances
>> dropped to
 ~1MB/s, while the other healthy instances consume at a rate of
>> ~3MB/s. As
 a result, the consumer lag for corresponding partitions are going
>> high.
 
 
 
 
 After triggering a tcpdump, we noticed the traffic pattern in tcp
 connection of problematic Mirrmaker instances is very different
>> from
 others. Packets flowing in problematic tcp connections are
>> relatively small
 and seq and ack packets are basically coming in one after
>> another. On the
 other hand, the packets in healthy tcp connections are coming in a
 different pattern, basically several seq packets comes with an
>> ack packets.
 Below screenshot shows the situation, and these two captures are
>> got on the
 same mirrormaker node.
 
 
 
 problematic connection.  ps. 10.kfk.kfk.kfk is kafka broker,
>> 10.mm.mm.mm
 is Mirrormaker node
 
 
>> https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2FZ3odjjT=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028=2DdGcjPWD7QI7lZ7v7QDN6I53P9tsSTMzEGdw6IywmU%3D=0
 
 
 healthy connection
 
 
>> https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2Fw0A6qHT=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028=v52DmmY9LHN2%2F59Hb5Xo77JuLreOA3lfDyq8eHKmISQ%3D=0
 
 
 If we stop the problematic Mirrormaker instance and when other
>> instances
 take over the lagged partitions, they can consume messages
>> quickly and
 catch up the lag soon. So the broker in source Kafaka cluster is
>> supposed
 to be good. But if Mirrormaker itself causes the issue, how can
>> one tcp
 connection is good but others are problematic since the
>> connections are all
 established in the same manner by Kafka library.
 
 
 
 Consumer configuration for Mirrormaker instance as below.
 
 

Re: Too Many Open Files

2016-08-01 Thread Steve Miller
Can you run lsof -p (pid) for whatever the pid is for your Kafka process?

For the fd limits you've set, I don't think subtlety is required: if there's a 
millionish lines in the output, the fd limit you set is where you think it is, 
and if it's a lot lower than that, the limit isn't being applied properly 
somehow (maybe you are running this under, say, supervisord, and maybe its 
config is lowering the limit, or the limits for root are as you say but the 
limits for the kafka user aren't being set properly, that sort of thing).

If you do have 1M lines in the output, at least this might give you a place to 
start figuring out what's open and why.

-Steve

> On Jul 31, 2016, at 4:14 PM, Kessiler Rodrigues  
> wrote:
> 
> I’m still experiencing this issue…
> 
> Here are the kafka logs.
> 
> [2016-07-31 20:10:35,658] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
>at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
>at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
>at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
>at kafka.network.Acceptor.accept(SocketServer.scala:323)
>at kafka.network.Acceptor.run(SocketServer.scala:268)
>at java.lang.Thread.run(Thread.java:745)
> [2016-07-31 20:10:35,658] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
>at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
>at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
>at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
>at kafka.network.Acceptor.accept(SocketServer.scala:323)
>at kafka.network.Acceptor.run(SocketServer.scala:268)
>at java.lang.Thread.run(Thread.java:745)
> [2016-07-31 20:10:35,658] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
> java.io.IOException: Too many open files
>at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
>at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
>at 
> sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
>at kafka.network.Acceptor.accept(SocketServer.scala:323)
>at kafka.network.Acceptor.run(SocketServer.scala:268)
>at java.lang.Thread.run(Thread.java:745)
> 
> My ulimit is 1 million, how is that possible?
> 
> Can someone help with this? 
> 
> 
>> On Jul 30, 2016, at 5:05 AM, Kessiler Rodrigues  
>> wrote:
>> 
>> I have changed it a bit.
>> 
>> I have 10 brokers and 20k topics with 1 partition each. 
>> 
>> I looked at the kaka’s logs dir and I only have 3318 files.
>> 
>> I’m doing some tests to see how many topics/partitions I can have, but it is 
>> throwing too many files once it hits 15k topics..
>> 
>> Any thoughts?
>> 
>> 
>> 
>>> On Jul 29, 2016, at 10:33 PM, Gwen Shapira  wrote:
>>> 
>>> woah, it looks like you have 15,000 replicas per broker?
>>> 
>>> You can go into the directory you configured for kafka's log.dir and
>>> see how many files you have there. Depending on your segment size and
>>> retention policy, you could have hundreds of files per partition
>>> there...
>>> 
>>> Make sure you have at least that many file handles and then also add
>>> handles for the client connections.
>>> 
>>> 1 million file handles sound like a lot, but you are running lots of
>>> partitions per broker...
>>> 
>>> We normally don't see more than maybe 4000 per broker and most
>>> clusters have a lot fewer, so consider adding brokers and spreading
>>> partitions around a bit.
>>> 
>>> Gwen
>>> 
>>> On Fri, Jul 29, 2016 at 12:00 PM, Kessiler Rodrigues
>>>  wrote:
 Hi guys,
 
 I have been experiencing some issues on kafka, where its throwing too many 
 open files.
 
 I have around of 6k topics and 5 partitions each.
 
 My cluster was made with 6 brokers. All of them are running Ubuntu 16 and 
 the file limits settings are:
 
 `cat  /proc/sys/fs/file-max`
 200
 
 `ulimit -n`
 100
 
 Anyone has experienced it before?
> 



Re: Debugging high log flush latency on a broker.

2015-09-22 Thread Steve Miller
   There may be more elegant ways to do this, but I'd think that you could just 
ls all the directories specified in log.dirs in your server.properties file for 
Kafka.  You should see directories for each topicname-partitionnumber there.

   Offhand it sounds to me like maybe something's evicting pages from the 
buffer cache from time to time, causing Kafka to do a lot more I/O all of a 
sudden than usual.  Why that happens, I don't know, but that'd be my guess: 
either something needs more pages for applications all of a sudden, or like you 
said, there's some characteristic of the traffic for the partitions on this 
broker that isn't the same as it is for all the other brokers.

   Filesystem type and creation parameters are the same as on the other hosts?  
sysctl stuff all tuned the same way (assuming this is Linux, that is)?

   Any chance there's some sort of network hiccup that makes some follower get 
a little behind, and then the act of it trying to catch back up pushes the I/O 
past what it can sustain steady-state?  (If something gets significantly 
behind, depending on the size of your buffer cache relative to the retention in 
your topics, you could have something, say, start reading from the first offset 
in that topic and partition, which might well require going to disk rather than 
being satisfied from the buffer cache.  I could see that slowing I/O enough, if 
it's on the edge otherwise, that now you can't keep up with the write rate 
until that consumer gets caught up.)

   The other idea would be that, I dunno, maybe there's topic where the segment 
size is different, and so when it goes to delete a segment it's spending a lot 
more time putting blocks from that file back onto the filesystem free list (or 
whatever data structure it is these days (-: ).

-Steve

On Tue, Sep 22, 2015 at 11:46:49AM -0700, Rajiv Kurian wrote:
> Also any hints on how I can find the exact topic/partitions assigned to
> this broker? I know in ZK we can see the partition -> broker mapping, but I
> am looking for a broker -> partition mapping. I can't be sure if the load
> that is causing this problem is because of leader traffic or follower
> traffic. What is weird is that I rarely if ever see other brokers in the
> cluster have the same problem. With 3 way replication (leader + 2 replicas)
> I'd imagine that the same work load would cause problems on other brokers
> too.


Disaster Recovery (was: Re: Suggestions when all replicas of a partition are dead?)

2015-08-08 Thread Steve Miller
   I could go either way.  I think that if ZK is up, then Kafka's going to go 
crazy trying to figure out who's the master of what, but maybe I'm not thinking 
the problem through clearly.

   That does beg the issue: it seems like it'd be good to have something 
written down somewhere to say how one should do a whole-cluster shutdown if one 
has to, and how one should recover from a surprise whole-cluster shutdown 
(e.g., someone hits the emergency-power-off button) should one happen.  It 
seems like in any long-lived Kafka cluster, that's going to happen 
*eventually*; even if the answer is you're doomed, start over at least that's 
documented and it can be worked into the plan.

   If one has an emergency power-off, or needs to shut it all down and bring it 
all back up, what *should* be the order of operations, then?

   In my particular situation, I think that what would have happened is that 
once we brought the rebuilt hosts back into the cluster, they'd have recreated 
the relevant partitions -- with no data, of course -- and negotiated who's the 
leader and other than the data loss, we might be OK.

   I'm not *as* clear though on what happens to the offsets for that partition 
in that scenario.  Would Kafka fish up the last offset for those partitions 
from ZK and start there?  Or would the offset for those partitions be reset to 
zero?

   If it's reset to 0, I could see much client wackiness, as clients say things 
like message 1000? pah! I don't need that, my offset is 635213513516!, 
leading to desperation moves like changing group IDs or poking Zookeeper in the 
eye from zookeeper-client.

   What's supposed to happen there?

   Finally, one thing that *is* clear is that messing around with the topics 
while things are in this sort of deranged state leads to tears.  We tried to do 
some things like delete _schemas before the broken hosts were repaved and 
brought back online, and at that point nothing I could do seemed to restore 
_schemas to functioning.

   The deletion didn't seem to happen.

   The partition data in ZK ended up being completely missing.

   None of the brokers seemed to want to forget about the metadata for that 
topic because they'd all decided they weren't the leader.  Attempts at getting 
them to redo leader election didn't seem to make a difference.

   Restarting the brokers (doing a rolling restart, with 10 minutes in between 
in case things needed to do replication -- which they shouldn't, we'd cut off 
the inbound data feeds!) just ended up with the fun of 
https://issues.apache.org/jira/browse/KAFKA-1554.

   Stopping all the brokers at once, deleting the /admin/delete_topics/_schemas 
and /brokers/topics/_schemas keys from ZK, deleting any 10485760-byte-sized 
index files just in case, deleting the directories for _schemas-0 everywhere, 
and starting everything again, seems to have resulted in a completely unstable, 
unusable, cluster, with the same error from KAFKA-1554, but with index files 
that aren't the usual 10485760-byte junk size.

   I figure we'll pave it and start over but I think it'd be useful (not just 
to me) to have a better idea of the failure states here and how to recover from 
them.

-Steve

On Fri, Aug 07, 2015 at 08:36:28PM +, Daniel Compton wrote:
 I would have thought you'd want ZK up before Kafka started, but I don't
 have any strong data to back that up.
 On Sat, 8 Aug 2015 at 7:59 AM Steve Miller st...@idrathernotsay.com wrote:
 
 So... we had an extensive recabling exercise, during which we had to
  shut down and derack and rerack a whole Kafka cluster.  Then when we
  brought it back up, we discovered the hard way that two hosts had their
  rebuild on reboot flag set in Cobbler.
 
 Everything on those hosts is gone as a result, of course.  And a total
  of four partitions had their primary and their replica on the two hosts
  that were nuked.
 
 This isn't the end of the world, in some sense: it's annoying, but
  that's why we did this now before we brought the cluster into real
  production rather than being in a pre-production state.  The data is all
  transient anyway (well, except for _schemas, of course, which in accordance
  to Murphy's law was one of the topics affected, but we have that mirrored
  elsewhere).
 
 Still, if there's an obvious way to recover from this, I couldn't find
  it googling around for a while.
 
 What's the recommended approach here?  Do we need to delete these
  topics and start over?  Do we need to delete *everything* and start over?
 
 (Also, other than don't do that! what's the recommended way to deal
  with the situation where you need to take a whole cluster down all at
  once?  Any order of operations related to how you shut down all the Kafka
  nodes, especially WRT how you shut down Zookeeper?  We deliberately brought
  Kafka up first *without* ZK, then brought up ZK, so that the brokers
  wouldn't go nuts with leader election and the like, which seemed

Suggestions when all replicas of a partition are dead?

2015-08-07 Thread Steve Miller
   So... we had an extensive recabling exercise, during which we had to shut 
down and derack and rerack a whole Kafka cluster.  Then when we brought it back 
up, we discovered the hard way that two hosts had their rebuild on reboot 
flag set in Cobbler.

   Everything on those hosts is gone as a result, of course.  And a total of 
four partitions had their primary and their replica on the two hosts that were 
nuked.

   This isn't the end of the world, in some sense: it's annoying, but that's 
why we did this now before we brought the cluster into real production rather 
than being in a pre-production state.  The data is all transient anyway (well, 
except for _schemas, of course, which in accordance to Murphy's law was one of 
the topics affected, but we have that mirrored elsewhere).

   Still, if there's an obvious way to recover from this, I couldn't find it 
googling around for a while.

   What's the recommended approach here?  Do we need to delete these topics and 
start over?  Do we need to delete *everything* and start over?

   (Also, other than don't do that! what's the recommended way to deal with 
the situation where you need to take a whole cluster down all at once?  Any 
order of operations related to how you shut down all the Kafka nodes, 
especially WRT how you shut down Zookeeper?  We deliberately brought Kafka up 
first *without* ZK, then brought up ZK, so that the brokers wouldn't go nuts 
with leader election and the like, which seemed to make sense, FWIW.)

-Steve


Re: kafka-python message offset?

2015-07-29 Thread Steve Miller
   Are you using mumrah/kafka-python?  I think so from context but I know 
there's at least one other implementation rattling around these days. (-:

   If that's what you're using, I can see two potential problems you might be 
having.  You can set the offset to some approximation of wherever you want, by 
using :

s_consumer.seek(offset, whence)

pydoc kafka.consumer says:

 |  seek(self, offset, whence)
 |  Alter the current offset in the consumer, similar to fseek
 |  
 |  offset: how much to modify the offset
 |  whence: where to modify it from
 |  0 is relative to the earliest available offset (head)
 |  1 is relative to the current offset
 |  2 is relative to the latest known offset (tail)

   So if you want to go to the beginning, do seek(0, 0).

   Seeking to the beginning or the end should be pretty reliable.  You can seek 
(say) 100K messages relative to the beginning or the end or to the current 
offset, but with partitions and message arrival order and the like it's a bit 
of a crapshoot where you'll end up.  The API will divide your offset by the 
number of partitions, then (I believe) apply that delta to each partition.  
Hoping that the input stream is relatively well balanced.

   But again, seek(0, 0) or seek(0, 2) has always worked reasonably for me, so 
much so that I don't remember the default.

   The other thing you might be running into is that if you are setting the 
partitions parameter to an array containing only zero, and your topic has more 
partitions than just partition #0, the producer might be publishing to a 
different partition.  But you've told the client to read only from partition 0, 
so in that case you'd see no data.  If you want to consume from every 
partition, don't pass in a partitions parameter.

-Steve

On Wed, Jul 29, 2015 at 04:07:33PM +, Keith Wiley wrote:
 I'm still not getting the necessary behavior.  If I run on the command line, 
 I get a series of messages:
 
 
 $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test 
 --from-beginning
 Test
 test
 tests
 asdf
 
 
 If I exclude the --from-beginning argument then it hangs, which indicates to 
 me that the offset is currently at the end and awaiting new messages. If I 
 run through python, it also hangs, which is why I suspect it is insistently 
 reading from the end.  See below:
 
 
 print Begin constructing SimpleConsumer
 client = KafkaClient(servers)
 s_consumer = SimpleConsumer(client,
 topic,
 group_id,
 partitions=[0], #Not sure I need this
 auto_commit=False, #Tried with and without, 
 doesn't fix the problem
 auto_offset_reset='smallest' #Tried with and 
 without, doesn't fix the problem
 )
 print End constructing SimpleConsumer\n
 
 print Begin reading messages
 try:
 for message in s_consumer:
 print   New message
 print+ message.topic
 print+ message.partition
 print+ message.offset
 print+ message.key
 print+ message.value
 except Exception as e:
 print Exception: , e
 print End reading messages\n
 
 print End all
 
 
 Output:
 
 Begin all
 
 Begin constructing SimpleConsumer
 End constructing SimpleConsumer
 
 Begin reading messages
 
 
 It just hangs after that.  I also tried with a KafkaConsumer instead of a 
 SimpleConsumer and it does exactly the same thing.  I'm not sure what to do.
 
 
 Keith Wiley
 Senior Software Engineer, Atigeo
 keith.wi...@atigeo.com
 
 
 
 
 
 
 From: Dana Powers dana.pow...@rd.io
 Sent: Tuesday, July 28, 2015 09:58 PM
 To: users@kafka.apache.org
 Subject: Re: kafka-python message offset?
 
 Hi Keith,
 
 you can use the `auto_offset_reset` parameter to kafka-python's
 KafkaConsumer. It behaves the same as the java consumer configuration of
 the same name. See
 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
 for more details on how to configure a KafkaConsumer instance.
 
 For fine-grained control wrt configuring topic/partition offsets, use
 KafkaConsumer.set_topic_partitions() . For the most control, pass a
 dictionary of {(topic, partition): offset, ...} .
 see
 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions
 
 -Dana
 
 On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley keith.wi...@atigeo.com wrote:
 
  I haven?t found a way to specify that a consumer should read from the
  beginning, or from any other explicit 

When in-sync isn't in sync?

2015-05-04 Thread Steve Miller
[ BTW, after some more research, I think what might be happening here is that 
we had some de-facto network partitioning happen as a side-effect of us 
renaming some network interfaces, though if that's the case, I'd like to know 
how to get everything back into sync. ]

   Hi.  I'm seeing something weird, where if I do a MetadataRequest, what I get 
back says I have out-of-sync replicas... but if I use kafka-topic.sh, it says I 
don't.  I'm running Kafka 0.8.1.1, still, for the moment, on Java 1.7.0_55.

   The code I have to do this uses kafka-python:

==
#!/usr/bin/python

import logging
import signal
import sys

# Should use argparse, but we shouldn't use python 2.6, either...
from optparse import OptionParser

import simplejson as json

from kafka.client import KafkaClient
from kafka.protocol import KafkaProtocol

#logging.basicConfig(level=logging.DEBUG)

def main():
parser = OptionParser()
parser.add_option('-t', '--topic', dest='topic',
  help='topic to which we should subscribe', default='mytopic')
parser.add_option('-b', '--broker', dest='kafkaHost',
  help='Kafka broker to which we should connect',
  default='host309'-ilg1.rtc.vrsn.com')

(options, args) = parser.parse_args()

kafka = KafkaClient('%s:9092' % options.kafkaHost)

# WARNING: terrible abuse of private methods follows.

id = kafka._next_id()

request = KafkaProtocol.encode_metadata_request(kafka.client_id, id)
response = kafka._send_broker_unaware_request(id, request)

(brokers, topics) = KafkaProtocol.decode_metadata_response(response)

if options.topic != '*':
topics_we_want = [options.topic]
else:
topics_we_want = sorted(topics.keys())

for topic in topics_we_want:
for partition in sorted(topics[topic].keys()):
meta = topics[topic][partition]
delta = set(meta.replicas) - set(meta.isr)
if len(delta) == 0:
print 'topic', topic, 'partition', partition, 'leader', 
meta.leader, 'replicas', meta.replicas, 'isr', meta.isr
else:
print 'topic', topic, 'partition', partition, 'leader', 
meta.leader, 'replicas', meta.replicas, 'isr', meta.isr, 'OUT-OF-SYNC', delta

sys.exit(0)

if __name__ == __main__:
#logging.basicConfig(level=logging.DEBUG)
main()
==

And if I run that against mytopic, I get:

topic mytopic partition 0 leader 311 replicas (311, 323) isr (311, 323)
topic mytopic partition 1 leader 323 replicas (323, 312) isr (312, 323)
topic mytopic partition 2 leader 324 replicas (324, 313) isr (324, 313)
topic mytopic partition 3 leader 309 replicas (309, 314) isr (314, 309)
topic mytopic partition 4 leader 315 replicas (310, 315) isr (315,) OUT-OF-SYNC 
set([310])
topic mytopic partition 5 leader 311 replicas (311, 316) isr (311, 316)
topic mytopic partition 6 leader 312 replicas (312, 317) isr (317, 312)
topic mytopic partition 7 leader 318 replicas (313, 318) isr (318, 313)
topic mytopic partition 8 leader 314 replicas (314, 319) isr (314, 319)
topic mytopic partition 9 leader 315 replicas (315, 320) isr (320, 315)
topic mytopic partition 10 leader 316 replicas (316, 321) isr (316, 321)
topic mytopic partition 11 leader 317 replicas (317, 322) isr (317, 322)
topic mytopic partition 12 leader 318 replicas (318, 323) isr (318, 323)
topic mytopic partition 13 leader 324 replicas (319, 324) isr (324,) 
OUT-OF-SYNC set([319])
topic mytopic partition 14 leader 320 replicas (320, 309) isr (320, 309)
topic mytopic partition 15 leader 321 replicas (321, 310) isr (321,) 
OUT-OF-SYNC set([310])
topic mytopic partition 16 leader 312 replicas (312, 320) isr (312, 320)
topic mytopic partition 17 leader 323 replicas (323, 313) isr (323, 313)
topic mytopic partition 18 leader 324 replicas (324, 314) isr (314, 324)
topic mytopic partition 19 leader 309 replicas (309, 315) isr (309, 315)

but if I do:

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper host301:2181 --topic 
mytopic

I get:

Topic:mytopic   PartitionCount:20   ReplicationFactor:2 
Configs:retention.bytes=1000
Topic: mytopic  Partition: 0Leader: 311 Replicas: 311,323   
Isr: 311,323
Topic: mytopic  Partition: 1Leader: 323 Replicas: 323,312   
Isr: 312,323
Topic: mytopic  Partition: 2Leader: 324 Replicas: 324,313   
Isr: 324,313
Topic: mytopic  Partition: 3Leader: 309 Replicas: 309,314   
Isr: 314,309
Topic: mytopic  Partition: 4Leader: 315 Replicas: 310,315   
Isr: 315,310
Topic: mytopic  Partition: 5Leader: 311 Replicas: 311,316   
Isr: 311,316
Topic: mytopic  Partition: 6Leader: 312 Replicas: 312,317   
Isr: 317,312
Topic: mytopic  Partition: 7Leader: 318 Replicas: 313,318   
Isr: 318,313
Topic: mytopic  Partition: 8Leader: 314 Replicas: 314,319   
Isr: 314,319
Topic: mytopic  Partition: 9

Re: [DISCUSS] KIP-14 Tools Standardization

2015-04-09 Thread Steve Miller
FWIW I like the standardization idea but just making the old switches fail 
seems like it's not the best plan.  People wrap this sort of thing for any 
number of reasons, and breaking all of their stuff all at once is not going to 
make them happy.  And it's not like keeping the old switches working for a 
while is all that challenging from a technical standpoint.

Even if all this does is break stuff when you finally phase out the old 
switches, telling people that will happen and giving them time to adjust will 
make them a lot less annoyed with the Kafka community when that happens.  They 
may still be annoyed, mind you, just not at you.  (-:

-Steve



 On Apr 8, 2015, at 10:56 PM, Matthew Warhaftig mwarhaf...@gmail.com wrote:
 
 The Tool Standardization KIP that Jiangjie started has been updated to 
 contain proposal details:
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-14+-+Tools+Standardization
 
 Any feedback is appreciated.
 
 Thanks,
 Matt


Re: The purpose of key in kafka

2014-12-19 Thread Steve Miller
   Also, if log.cleaner.enable is true in your broker config, that enables the 
log-compaction retention strategy.

   Then, for topics with the per-topic cleanup.policy=compact config 
parameter set, Kafka will scan the topic periodically, nuking old versions of 
the data with the same key.

   I seem to remember that there's some trickiness here, it's not that you're 
absolutely guaranteed to have just one message there with the same key, it's 
just that you'll always have at least one with that key.  I think that depends 
a bit on how big the segments are and how often you're configured to purge old 
log data and that sort of thing.  The idea is that you could have long-term 
persistent data stored within a topic without it getting out of control.

   But in any case, that's another thing that the keys can be useful for.

   It's been six months or so since I tried that so the details are a bit 
fuzzy, but it's something like that, at least.

-Steve

On Fri, Dec 19, 2014 at 01:04:36PM -0800, Rajiv Kurian wrote:
 Thanks, didn't know that.
 
 On Fri, Dec 19, 2014 at 10:39 AM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Hi Rajiv,
 
  You can send messages without keys. Just provide null for key.
 
  Jiangjie (Becket) Qin
 
 
  On 12/19/14, 10:14 AM, Rajiv Kurian ra...@signalfuse.com wrote:
 
  Hi all,
  
  I was wondering what why every ProducerRecord sent requires a serialized
  key. I am using kafka, to send opaque bytes and I am ending up creating
  garbage keys because I don't really have a good one.
  
  Thanks,
  Rajiv
 
 


Re: Keep on getting kafka.common.OffsetOutOfRangeException: Random times

2014-08-20 Thread Steve Miller
That seems likely.  I'd try either catching the exception and resetting the 
offset, or upping log.retention.hours.  I'd be interested in hearing if that 
fixes the problem.

-Steve

 On Aug 19, 2014, at 11:54 PM, pradeep.si...@wipro.com wrote:
 
 Thank you for your reply. Oh is retention hours have affect on this? I didn't 
 knew this. I have log.retention.hours set to 1, and during development we 
 test this once a 15 mins or hour or 2. So do you think this is causing the 
 issue?
 
 Thanks,
 Pradeep Simha
 Technical Lead
 
 -Original Message-
 From: Steve Miller [mailto:st...@idrathernotsay.com] 
 Sent: Tuesday, August 19, 2014 6:13 PM
 To: users@kafka.apache.org
 Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException: Random 
 times
 
   Also, what do you have log.retention.hours set to?  How often do you 
 publish messages?
 
   I can envision a scenario in which you don't publish to a topic often, and 
 in fact publish so infrequently that everything in the topic ages out from 
 log.retention.hours first.
 
   I don't know exactly what happens should that occur, but I've seen some 
 stuff that makes me think that the offsets might go back to zero -- or maybe 
 they do if the broker restarts, so you might check to be sure that's not 
 happening.
 
   From what I've seen in that regard, I've been wondering if part of the way 
 most long-running Kafka consumers shouldn't be designed is to catch that 
 exception and either set their offset to the first available message or the 
 last available message, depending on whether their priority is to get every 
 message or if it's to get the most recent messages.  Though in that scenario 
 maybe it's that the first and last messages are the same by definition since 
 there aren't any messages left in the topic. (-:
 
   It's also possible that the specific topic weirdness that my specific 
 installation has been running into is causing that and it only happens for 
 the stuff I work on, so definitely take this with a grain of salt, I'm no 
 expert, just relating the local folklore.
 
-Steve
 
 On Tue, Aug 19, 2014 at 09:12:30AM +, pradeep.si...@wipro.com wrote:
 Hi Team,
 
 Can someone please help me in this? This is really becoming road block to 
 our project we should decide whether to continue to use Kafka or some other 
 project as it is becoming  too much of unstable.
 
 Thanks,
 Pradeep Simha
 Technical Lead
 
 -Original Message-
 From: pradeep.si...@wipro.com [mailto:pradeep.si...@wipro.com]
 Sent: Tuesday, August 19, 2014 9:30 AM
 To: users@kafka.apache.org
 Subject: RE: Keep on getting kafka.common.OffsetOutOfRangeException: 
 Random times
 
 Hi Neha,
 
 Yes, I am using the latest version ie (0.8.1.1).
 
 Hi Guozhang,
 
 These are the values:
 
 #log.retention.bytes= 1073741824 (Yes, this was commented by default)
 
 log.retention.check.interval.ms=6
 
 Am I doing anything wrong here? Since I haven't touched this properties file.
 
 Thanks,
 Pradeep Simha
 Technical Lead
 
 -Original Message-
 From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
 Sent: Tuesday, August 19, 2014 2:27 AM
 To: users@kafka.apache.org
 Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException: 
 Random times
 
 Also, what version of Kafka are you using? 0.8.1.1 is the latest most stable 
 version.
 
 
 On Mon, Aug 18, 2014 at 9:36 AM, Guozhang Wang wangg...@gmail.com wrote:
 
 Hi Pradeep,
 
 It seems your logs gets deleted due to retention policy. Could you 
 check the config values for log.retention.bytes and 
 log.retention.check.interval.ms?
 
 http://kafka.apache.org/documentation.html#brokerconfigs
 
 Guozhang
 
 
 On Mon, Aug 18, 2014 at 5:49 AM, pradeep.si...@wipro.com wrote:
 
 Hi Team,
 
 Of late I am facing strange issue w.r.t Kafka. Random times I keep 
 on getting these strange errors while consuming the topic:
 
 
 kafka.common.OffsetOutOfRangeException: Request for offset 19 but 
 we only have log segments in the range 0 to 0.
 Sometimes I get like this:
 
 
 kafka.common.OffsetOutOfRangeException: Request for offset 19 but 
 we only have log segments in the range 19 to 22.
 
 That number keeps on changing (with random ranges). I don't know 
 what is the problem here. Both producer and consumer will work 
 perfectly, but I keep on getting these errors randomly. In that 
 situation if I clear the logs, remove the broker again it starts working 
 fine again.
 
 Can anyone please help me in this regard? This is affecting our 
 application stability, if any more information required I can 
 provide,
 also
 we are using only the defaults provided by the kafka we didn't 
 changed
 any
 settings.
 
 Thanks,
 Pradeep Simha
 Technical Lead
 
 The information contained in this electronic message and any 
 attachments to this message are intended for the exclusive use of 
 the addressee(s)
 and
 may contain proprietary, confidential or privileged information. 
 If you
 are
 not the intended recipient, you

Re: Keep on getting kafka.common.OffsetOutOfRangeException: Random times

2014-08-20 Thread Steve Miller
. Kafka is not magic, it can only do what it's told.
 
 In practise I have found compression to be a big help -- big savings on disk 
 space.
 
 
 Philip
 
 ?
 -
 http://www.philipotoole.com 
 
 
 On Wednesday, August 20, 2014 1:42 AM, pradeep.si...@wipro.com 
 pradeep.si...@wipro.com wrote:
  
 
 
 Sure, I would try with setting longer retention hours. But I feel this would 
 not be good approach? Should we raise it as a bug?
 
 Thanks,
 Pradeep Simha
 Technical Lead
 
 -Original Message-
 From: Manjunath Shivakumar [mailto:manjunath.shivaku...@betfair.com]
 Sent: Wednesday, August 20, 2014 1:31 PM
 To: users@kafka.apache.org
 Subject: RE: Keep on getting kafka.common.OffsetOutOfRangeException: Random 
 times
 
 We had a similar issue in our dev environments, where we had to configure 
 aggressive log retention to save space.
 And the clients kept failing with this error, on Mondays, because the message 
 from friday had got deleted.
 Perhaps compaction would help in this scenario too?
 https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction
 
 
 From: Steve Miller [st...@idrathernotsay.com]
 Sent: 20 August 2014 08:47
 To: users@kafka.apache.org
 Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException: Random 
 times
 
 That seems likely.? I'd try either catching the exception and resetting the 
 offset, or upping log.retention.hours.? I'd be interested in hearing if that 
 fixes the problem.
 
 ? ? -Steve
 
  On Aug 19, 2014, at 11:54 PM, pradeep.si...@wipro.com wrote:
 
  Thank you for your reply. Oh is retention hours have affect on this? I 
  didn't knew this. I have log.retention.hours set to 1, and during 
  development we test this once a 15 mins or hour or 2. So do you think this 
  is causing the issue?
 
  Thanks,
  Pradeep Simha
  Technical Lead
 
  -Original Message-
  From: Steve Miller [mailto:st...@idrathernotsay.com]
  Sent: Tuesday, August 19, 2014 6:13 PM
  To: users@kafka.apache.org
  Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException:
  Random times
 
 ?  Also, what do you have log.retention.hours set to?? How often do you 
 publish messages?
 
 ?  I can envision a scenario in which you don't publish to a topic often, 
 and in fact publish so infrequently that everything in the topic ages out 
 from log.retention.hours first.
 
 ?  I don't know exactly what happens should that occur, but I've seen some 
 stuff that makes me think that the offsets might go back to zero -- or maybe 
 they do if the broker restarts, so you might check to be sure that's not 
 happening.
 
 ?  From what I've seen in that regard, I've been wondering if part of the 
 way most long-running Kafka consumers shouldn't be designed is to catch that 
 exception and either set their offset to the first available message or the 
 last available message, depending on whether their priority is to get every 
 message or if it's to get the most recent messages.? Though in that scenario 
 maybe it's that the first and last messages are the same by definition since 
 there aren't any messages left in the topic. (-:
 
 ?  It's also possible that the specific topic weirdness that my specific 
 installation has been running into is causing that and it only happens for 
 the stuff I work on, so definitely take this with a grain of salt, I'm no 
 expert, just relating the local folklore.
 
 ? ? -Steve
 
  On Tue, Aug 19, 2014 at 09:12:30AM +, pradeep.si...@wipro.com wrote:
  Hi Team,
 
  Can someone please help me in this? This is really becoming road block to 
  our project we should decide whether to continue to use Kafka or some 
  other project as it is becoming? too much of unstable.
 
  Thanks,
  Pradeep Simha
  Technical Lead
 
  -Original Message-
  From: pradeep.si...@wipro.com [mailto:pradeep.si...@wipro.com]
  Sent: Tuesday, August 19, 2014 9:30 AM
  To: users@kafka.apache.org
  Subject: RE: Keep on getting kafka.common.OffsetOutOfRangeException:
  Random times
 
  Hi Neha,
 
  Yes, I am using the latest version ie (0.8.1.1).
 
  Hi Guozhang,
 
  These are the values:
 
  #log.retention.bytes= 1073741824 (Yes, this was commented by default)
 
  log.retention.check.interval.ms=6
 
  Am I doing anything wrong here? Since I haven't touched this properties 
  file.
 
  Thanks,
  Pradeep Simha
  Technical Lead
 
  -Original Message-
  From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
  Sent: Tuesday, August 19, 2014 2:27 AM
  To: users@kafka.apache.org
  Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException:
  Random times
 
  Also, what version of Kafka are you using? 0.8.1.1 is the latest most 
  stable version.
 
 
  On Mon, Aug 18, 2014 at 9:36 AM, Guozhang Wang wangg...@gmail.com wrote:
 
  Hi Pradeep,
 
  It seems your logs gets deleted due to retention policy. Could you
  check the config values for log.retention.bytes

Re: Keep on getting kafka.common.OffsetOutOfRangeException: Random times

2014-08-19 Thread Steve Miller
   Also, what do you have log.retention.hours set to?  How often do you publish 
messages?

   I can envision a scenario in which you don't publish to a topic often, and 
in fact publish so infrequently that everything in the topic ages out from 
log.retention.hours first.

   I don't know exactly what happens should that occur, but I've seen some 
stuff that makes me think that the offsets might go back to zero -- or maybe 
they do if the broker restarts, so you might check to be sure that's not 
happening.

   From what I've seen in that regard, I've been wondering if part of the way 
most long-running Kafka consumers shouldn't be designed is to catch that 
exception and either set their offset to the first available message or the 
last available message, depending on whether their priority is to get every 
message or if it's to get the most recent messages.  Though in that scenario 
maybe it's that the first and last messages are the same by definition since 
there aren't any messages left in the topic. (-:

   It's also possible that the specific topic weirdness that my specific 
installation has been running into is causing that and it only happens for the 
stuff I work on, so definitely take this with a grain of salt, I'm no expert, 
just relating the local folklore.

-Steve

On Tue, Aug 19, 2014 at 09:12:30AM +, pradeep.si...@wipro.com wrote:
 Hi Team,
 
 Can someone please help me in this? This is really becoming road block to our 
 project we should decide whether to continue to use Kafka or some other 
 project as it is becoming  too much of unstable.
 
 Thanks,
 Pradeep Simha
 Technical Lead
 
 -Original Message-
 From: pradeep.si...@wipro.com [mailto:pradeep.si...@wipro.com] 
 Sent: Tuesday, August 19, 2014 9:30 AM
 To: users@kafka.apache.org
 Subject: RE: Keep on getting kafka.common.OffsetOutOfRangeException: Random 
 times
 
 Hi Neha,
 
 Yes, I am using the latest version ie (0.8.1.1).
 
 Hi Guozhang,
 
 These are the values:
 
 #log.retention.bytes= 1073741824 (Yes, this was commented by default)
 
 log.retention.check.interval.ms=6
 
 Am I doing anything wrong here? Since I haven't touched this properties file.
 
 Thanks,
 Pradeep Simha
 Technical Lead
 
 -Original Message-
 From: Neha Narkhede [mailto:neha.narkh...@gmail.com]
 Sent: Tuesday, August 19, 2014 2:27 AM
 To: users@kafka.apache.org
 Subject: Re: Keep on getting kafka.common.OffsetOutOfRangeException: Random 
 times
 
 Also, what version of Kafka are you using? 0.8.1.1 is the latest most stable 
 version.
 
 
 On Mon, Aug 18, 2014 at 9:36 AM, Guozhang Wang wangg...@gmail.com wrote:
 
  Hi Pradeep,
 
  It seems your logs gets deleted due to retention policy. Could you 
  check the config values for log.retention.bytes and 
  log.retention.check.interval.ms?
 
  http://kafka.apache.org/documentation.html#brokerconfigs
 
  Guozhang
 
 
  On Mon, Aug 18, 2014 at 5:49 AM, pradeep.si...@wipro.com wrote:
 
   Hi Team,
  
   Of late I am facing strange issue w.r.t Kafka. Random times I keep 
   on getting these strange errors while consuming the topic:
  
  
   kafka.common.OffsetOutOfRangeException: Request for offset 19 but we 
   only have log segments in the range 0 to 0.
   Sometimes I get like this:
  
  
   kafka.common.OffsetOutOfRangeException: Request for offset 19 but we 
   only have log segments in the range 19 to 22.
  
   That number keeps on changing (with random ranges). I don't know 
   what is the problem here. Both producer and consumer will work 
   perfectly, but I keep on getting these errors randomly. In that 
   situation if I clear the logs, remove the broker again it starts working 
   fine again.
  
   Can anyone please help me in this regard? This is affecting our 
   application stability, if any more information required I can 
   provide,
  also
   we are using only the defaults provided by the kafka we didn't 
   changed
  any
   settings.
  
   Thanks,
   Pradeep Simha
   Technical Lead
  
   The information contained in this electronic message and any 
   attachments to this message are intended for the exclusive use of 
   the addressee(s)
  and
   may contain proprietary, confidential or privileged information. If 
   you
  are
   not the intended recipient, you should not disseminate, distribute 
   or
  copy
   this e-mail. Please notify the sender immediately and destroy all 
   copies
  of
   this message and any attachments.
  
   WARNING: Computer viruses can be transmitted via email. The 
   recipient should check this email and any attachments for the presence of 
   viruses.
   The company accepts no liability for any damage caused by any virus 
   transmitted by this email.
  
   www.wipro.com
  
 
 
 
  --
  -- Guozhang
 
 
 The information contained in this electronic message and any attachments to 
 this message are intended for the exclusive use of the addressee(s) and may 
 contain proprietary, confidential or privileged information. If you are not 
 

Re: Strange topic-corruption issue?

2014-08-17 Thread Steve Miller
   Odd -- I copied and pasted what you'd asked me to run:

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 
.log

and I just re-ran it and the output looks the same as what I'd put up for 
people to grab.

   I also ran:

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration 
--files .log

and the output is identical (I diffed it to be sure).

   The publisher is publishing with compression turned off, though; when we had 
it turned on, I remember seeing some references to Snappy in there.  I'd turned 
compression off on the theory that (at the time) we had one thing producing in 
a way that caused corruption and one thing that was producing using 
compression, and maybe they were linked (but turning compression off didn't 
help).

-Steve

On Fri, Aug 15, 2014 at 04:00:33PM -0700, Jun Rao wrote:
 What's in there seems to be still the output for deep iteration. For
 shallow iteration, the compression codec for each message should be Snappy.
 
 Thanks,
 
 Jun
 
 
 On Fri, Aug 15, 2014 at 5:27 AM, Steve Miller st...@idrathernotsay.com
 wrote:
 
 Oh, yeah, sorry about that.  I threw a copy of that up at:
 
  https://newbie.idrathernotsay.com/full.txt.gz
 
  (you'll get a cert error, for the four times I put something on the home
  website each year, I didn't feel like getting a real cert (-: ).
 
 If that doesn't work I'm sure I can figure something else out.
 
  -Steve
 
  On Thu, Aug 14, 2014 at 05:04:29PM -0700, Neha Narkhede wrote:
   Apache doesn't allow attachments. Could you send maybe a pastebin or
   something?
 


Strange topic-corruption issue?

2014-08-12 Thread Steve Miller
[ Aha!, you say, now I know why this guy's been doing so much tshark stuff! 
(-: ] 

   Hi.  I'm running into a strange situation, in which more or less all of the 
topics on our Kafka server behave exactly as expected... but the data produced 
by one family of applications is producing fairly frequent topic corruption.

   When this happens, on the client side, the results are all over the place: 
sometimes you get a ConsumerFetchSizeTooSmall exception, or an exception for an 
unknown error type, or an invalid-offset error, it's all over the map.

   On the server side, I think something like this is the first sign of badness:

[2014-08-11 21:03:28,121] ERROR [KafkaApi-1] Error processing ProducerRequest 
with correlation id 6750 from client test-producer on partition [mytopic,9] 
(kafka.server.KafkaApis)
java.lang.ArrayIndexOutOfBoundsException
[2014-08-11 21:03:28,121] INFO [KafkaApi-1] Send the close connection response 
due to error handling produce request [clientId = test-producer, correlationId 
= 6750, topicAndPartition = [mytopic,9]] with Ack=0 (kafka.server.KafkaApis)

shortly thereafter, you begin to see oddness facing the clients:

[2014-08-11 21:17:58,132] ERROR [KafkaApi-1] Error when processing fetch 
request for partition [mytopic,9] offset 1327 from consumer with correlation id 
87204 (kafka.server.KafkaApis)
java.lang.IllegalStateException: Invalid message size: 0
at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:127)
at kafka.log.LogSegment.translateOffset(LogSegment.scala:100)
at kafka.log.LogSegment.read(LogSegment.scala:137)
at kafka.log.Log.read(Log.scala:386)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:437)
at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Unknown Source)

If I go run the DumpLogSegments tool on the particular topic and partition 
that's generating the errors, I can see there's corruption in the log:

Non-secutive offsets in :/data/d3/kafka/log/mytopic-9/.log
  1327 is followed by 1327

The only thing producing data to corrupted topics was also the only thing where 
snappy compression was turned on in the Java API being used by the producer 
(it's a Storm topology; we've had the same issue with one in Scala and with one 
that produces very similar data, but that was written in Java).  We turned that 
off, published to a different topic name (so it was created fresh), and had a 
couple of happy days where all was well.  Then we decided that all was well so 
we tried to go back to the original topic -- after we'd verified that all data 
had aged out of the logs for that topic.  And we started seeing errors again.  
So we switched to a different topic again, let it be created, and also started 
seeing errors on that topic.

We have other producers, written in C and Java and python, which are working 
flawlessly, even though the size of the data they produce and the rate at which 
they produce it is much larger than what we're seeing with this one problematic 
producer.  We also have producers written in other languages that produce at 
very low rates, so it's (probably) not the sort of thing where the issue is 
masked by more frequent data production.

But in any case it looks like there's something the client can send that will 
corrupt the topic, which seems like something that shouldn't be able to happen. 
 I know there's at least some error checking for bad protocol requests, as I 
hacked a python client to produce some corrupt messages and saw an error 
response from the server.

I'm happy to supply more data but I'm not sure what would be useful.  I'm also 
fine with continuing to dig into this on my own but I'd reached a point where 
it'd be useful to know if anyone had seen something like this before.  I have a 
ton o' tcpdumps running and some tail -F greps running on the logs so that if 
we see that producer error again we can go find the