Additionally, on consumer I observe a strange behavior: it is being constantly rebalancing. There are no errors and each rebalance succeeds, but as soon as one is finished the next one is started.
On Wed, Oct 19, 2016 at 4:36 PM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > Hello, > > I run Kafka 0.8.2.2 cluster with 3 nodes and recently started to observe > strange behavior on select topics. The cluster runs in-house as well as > most consumers. I have started some consumers in AWS and they _mostly_ work > fine. Occasionally, I end up in a state where when I run > kafka-consumer-offset-checker I see that offset of one partition goes back > and forth (i.e. it was 1000 then goes to 900 then goes to 1100 etc.) > > Kafka broker that is holding this partition has following log messages: > {"@timestamp":"2016-10-19T21:00:00.134Z","@service":"kafka" > ,"thread":"kafka-request-handler-2","logger":"kafka. > server.ReplicaManager","@host":"kafka-0","@category":"common","@msg":"[Replica > Manager on Broker 0]: Error when processing fetch request for partition > [my_topic,1] offset 337055698 from consumer with correlation id 0. Possible > cause: Request for offset 337055698 but we only have log segments in the > range 347392118 to 361407455.","@version":"1","@severity":"ERROR"} > > {"@timestamp":"2016-10-19T21:00:00.168Z","@service":"kafka","exception":" > java.io.IOException: Broken pipe > at > sun.nio.ch.FileChannelImpl.transferTo0(Native Method) > at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:434) > at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:566) > at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) > at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70) > at kafka.network.MultiSend.writeTo(Transmission.scala:101) > at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125) > at kafka.network.MultiSend.writeTo(Transmission.scala:101) > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) > at kafka.network.Processor.write(SocketServer.scala:472) > at kafka.network.Processor.run(SocketServer.scala:342) > at java.lang.Thread.run(Thread.java:745)\n","thread":"kafka- > network-thread-6667-0","logger":"kafka.network.Processor","@host":"kafka0. > util.pages","@category":"common","@msg":"Closing socket for /10.10.10.10 > because of error","@version":"1","@severity":"ERROR"} > > IP above is obscured, but it is an IP of the EC2 node that runs the > consumer for that partition. > > I try to reset offset for the consumer group at that partition manually (I > wrote a script for that), but I still see it being reset to a prior point > (and back). It seems that after a while this behavior goes away and > affected partitions have a chance to catch up, but then the whole thing > repeats. > > My consumer configuration is: > > "socket.timeout.ms": "60000", > "zookeeper.session.timeout.ms": "60000", > "offsets.channel.socket.timeout.ms": "30000" > "auto.offset.reset": "smallest" > "offsets.storage": "kafka" > "consumer.timeout.ms": "1500" > > I use reactive-kafka wrapper, other places where it is used do not have > these problems. > > Please, advice what could this be. > > Thanks, > Timur >