ConsumerOffsetChecker returns negative value for log lag
Hi, I have a Kafka 0.8.1 cluster. I used the ConsumerOffsetChecker tool to check the lag of consumer groups. I found that for some partition, the tool returns negative value for the lag column. Is this a known issue that has been seen before? I find that the negative value prevents the consumer consuming the latest events in these partitions. How can we work around the problem? The following is the command: ~/kafka_2.9.2-0.8.1$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group topic_partition --zkconnect zk001:2181 --topic the_topic | grep ' -' The following is part of the output. The topic that I am checking has 128 partitions, and the tool returns negative value for 63 partitions. topic_partition event 6 202936733 28822327 * -*174114406 topic_partition_m031_29714_20-0 topic_partition event 10 177322216 36578944 * -*140743272 topic_partition_m032_16773_16-0 topic_partition event 11 187891640 28999350 * -*158892290 topic_partition_m032_16773_17-0 Thanks! -Yu
Too much log for kafka.common.KafkaException
Hi, all Recently, I upgrade my Kafka cluster to 0.8.1.1 and set replication with num.replica.fetchers=5. Last night, there's something wrong with the network. Soon, I found the server.log files (not data log!) on every node reached 4GB in an hour. I am not sure if it's my inappropriate configuration or other reason. Can anybody help me with this. Thanks~ log file tail [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because of error (kafka.network.Processor) kafka.common.KafkaException: This operation cannot be completed on a complete request. at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:745) [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because of error (kafka.network.Processor) kafka.common.KafkaException: This operation cannot be completed on a complete request. at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:745) [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.65 because of error (kafka.network.Processor) kafka.common.KafkaException: This operation cannot be completed on a complete request. at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:745) -- *Xingcan*
Re: Too much log for kafka.common.KafkaException
This looks very similar to the error and stacktrace I see when reproducing https://issues.apache.org/jira/browse/KAFKA-1196 -- that's an overflow where the data returned in a FetchResponse exceeds 2GB. (It triggers the error you're seeing because FetchResponse's size overflows to become negative, which breaks tests for whether data has finished sending.) I haven't tested against 0.8.1.1, but it looks identical modulo line #'s. If it's the same issue, unfortunately it won't fix itself, so that log will just keep growing with more error messages as the consumer keeps reconnecting, requesting data, then triggering the error in the broker which forcibly disconnects the consumer. I'm not certain what to suggest here since KAFKA-1196 still needs a lot of refinement. But given the 0.8.1.1 code I don't think there's much choice but to try to reduce the amount of data that will be returned. One way to do that is is to reduce the # of partitions read in the FetchRequest (i.e. make sure FetchRequests address fewer TopicAndPartitions, maybe putting each TopicAndPartition in its own request). An alternative would be to use more recent offsets (i.e. don't start from the oldest data available in Kafka). A recent enough offset should result in a 2GB response. -Ewen On Sat, Oct 18, 2014, at 12:07 AM, xingcan wrote: Hi, all Recently, I upgrade my Kafka cluster to 0.8.1.1 and set replication with num.replica.fetchers=5. Last night, there's something wrong with the network. Soon, I found the server.log files (not data log!) on every node reached 4GB in an hour. I am not sure if it's my inappropriate configuration or other reason. Can anybody help me with this. Thanks~ log file tail [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because of error (kafka.network.Processor) kafka.common.KafkaException: This operation cannot be completed on a complete request. at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:745) [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because of error (kafka.network.Processor) kafka.common.KafkaException: This operation cannot be completed on a complete request. at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:745) [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.65 because of error (kafka.network.Processor) kafka.common.KafkaException: This operation cannot be completed on a complete request. at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) at kafka.network.Processor.write(SocketServer.scala:375) at kafka.network.Processor.run(SocketServer.scala:247) at java.lang.Thread.run(Thread.java:745) -- *Xingcan*
Re: postgresql consumer
Hi, all I've just made a 3-node kafka cluster (9 brokers, 3 for each node), the performance test is OK. Now I am using tridentKafkaSpout, and being able to getting data from producer, see BrokerHosts zk = new ZkHosts(10.100.70.128:2181); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, topictest); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(spoutConf); // TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(spoutConf); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream(topictestspout, kafkaSpout).shuffle() .each(new Fields(str), new PrintStream(), new Fields(event_object)) .parallelismHint(16); With above code, I can print out the json objects published to brokers. Instead of printing messages, I will like to simply populate the messages into postgresql DB. I download the code from https://github.com/geoforce/storm-postgresql Here the problems I have: 1. When I am running the storm-postgresql code, the messages generated from a RandomTupleSpout(), I am only able to write data into postgresql DB 100 rows regardless how I change the PostgresqlStateConfig. 2. Now I want to be able to write the json messages into postgresql DB, things seem to be simple, just 2 columns in the DB table, id and events which stores json messages. Forgive my dullness, I couldn't get it work by storm-postgresql. I wonder if anyone has done the similar jobs, getting data from tridentKafkaSpout and write exactly into postgresql DB. In addition, once the writer starts to work, if it stops and restarts for some reasons, and I will to writer to resume the consume process from the stop point instead of very beginning, how to manage the offset and restart to write into DB? thanks Alec Hi, All I setup a kafka cluster, and plan to publish the messages from Web to kafka, the messages are in the form of json, I want to implement a consumer to write the message I consumer to postgresql DB, not aggregation at all. I was thinking to use KafkaSpout in storm to make it happen, now I want to simplify the step, just use kafka consumer to populate message into postgresql. This consumer should have the functions of consumer data, write into postgresql DB in batch, if servers down, consumer can retrieve the data it stored in hard drive with no redundancy and can consume the data from where it stopped once the server up. Is there any sample code for this? thanks a lot Alec