ConsumerOffsetChecker returns negative value for log lag

2014-10-18 Thread Yu Yang
Hi,

I have a Kafka 0.8.1 cluster. I used the ConsumerOffsetChecker tool to
check the lag of consumer groups. I found that for some partition, the tool
returns negative value for the lag  column.  Is this a known issue that
has been seen before? I find that the negative value prevents the consumer
consuming the latest events in these partitions.  How can we work around
the problem?

The following is the command:

~/kafka_2.9.2-0.8.1$ bin/kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker --group topic_partition --zkconnect
zk001:2181 --topic the_topic | grep ' -'

The following is part of the output. The topic that I am checking has 128
partitions, and the tool returns negative value for 63 partitions.

topic_partition event  6   202936733
28822327   * -*174114406  topic_partition_m031_29714_20-0

topic_partition event  10  177322216
36578944   * -*140743272  topic_partition_m032_16773_16-0

topic_partition event  11  187891640
28999350   * -*158892290  topic_partition_m032_16773_17-0



Thanks!

-Yu


Too much log for kafka.common.KafkaException

2014-10-18 Thread xingcan
Hi, all

Recently, I upgrade my Kafka cluster  to 0.8.1.1 and set replication with
num.replica.fetchers=5. Last night, there's something wrong with the
network. Soon, I found the server.log files (not data log!) on every node
reached 4GB in an hour.
I am not sure if it's my inappropriate configuration or other reason. Can
anybody help me with this. Thanks~

log file tail

[2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because of
error (kafka.network.Processor)
kafka.common.KafkaException: This operation cannot be completed on a
complete request.
at
kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
at
kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
at kafka.network.Processor.write(SocketServer.scala:375)
at kafka.network.Processor.run(SocketServer.scala:247)
at java.lang.Thread.run(Thread.java:745)
[2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because of
error (kafka.network.Processor)
kafka.common.KafkaException: This operation cannot be completed on a
complete request.
at
kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
at
kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
at kafka.network.Processor.write(SocketServer.scala:375)
at kafka.network.Processor.run(SocketServer.scala:247)
at java.lang.Thread.run(Thread.java:745)
[2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.65 because of
error (kafka.network.Processor)
kafka.common.KafkaException: This operation cannot be completed on a
complete request.
at
kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
at
kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
at kafka.network.Processor.write(SocketServer.scala:375)
at kafka.network.Processor.run(SocketServer.scala:247)
at java.lang.Thread.run(Thread.java:745)



-- 
*Xingcan*


Re: Too much log for kafka.common.KafkaException

2014-10-18 Thread Ewen Cheslack-Postava
This looks very similar to the error and stacktrace I see when
reproducing https://issues.apache.org/jira/browse/KAFKA-1196 -- that's
an overflow where the data returned in a FetchResponse exceeds 2GB. (It
triggers the error you're seeing because FetchResponse's size overflows
to become negative, which breaks tests for whether data has finished
sending.) I haven't tested against 0.8.1.1, but it looks identical
modulo line #'s. If it's the same issue, unfortunately it won't fix
itself, so that log will just keep growing with more error messages as
the consumer keeps reconnecting, requesting data, then triggering the
error in the broker which forcibly disconnects the consumer.

I'm not certain what to suggest here since KAFKA-1196 still needs a lot
of refinement. But given the 0.8.1.1 code I don't think there's much
choice but to try to reduce the amount of data that will be returned.
One way to do that is is to reduce the # of partitions read in the
FetchRequest (i.e. make sure FetchRequests address fewer
TopicAndPartitions, maybe putting each TopicAndPartition in its own
request). An alternative would be to use more recent offsets (i.e. don't
start from the oldest data available in Kafka). A recent enough offset
should result in a  2GB response.
 
-Ewen

On Sat, Oct 18, 2014, at 12:07 AM, xingcan wrote:
 Hi, all
 
 Recently, I upgrade my Kafka cluster  to 0.8.1.1 and set replication with
 num.replica.fetchers=5. Last night, there's something wrong with the
 network. Soon, I found the server.log files (not data log!) on every node
 reached 4GB in an hour.
 I am not sure if it's my inappropriate configuration or other reason. Can
 anybody help me with this. Thanks~
 
 log file tail
 
 [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because
 of
 error (kafka.network.Processor)
 kafka.common.KafkaException: This operation cannot be completed on a
 complete request.
 at
 kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
 at
 kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
 at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
 at kafka.network.Processor.write(SocketServer.scala:375)
 at kafka.network.Processor.run(SocketServer.scala:247)
 at java.lang.Thread.run(Thread.java:745)
 [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because
 of
 error (kafka.network.Processor)
 kafka.common.KafkaException: This operation cannot be completed on a
 complete request.
 at
 kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
 at
 kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
 at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
 at kafka.network.Processor.write(SocketServer.scala:375)
 at kafka.network.Processor.run(SocketServer.scala:247)
 at java.lang.Thread.run(Thread.java:745)
 [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.65 because
 of
 error (kafka.network.Processor)
 kafka.common.KafkaException: This operation cannot be completed on a
 complete request.
 at
 kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
 at
 kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
 at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
 at kafka.network.Processor.write(SocketServer.scala:375)
 at kafka.network.Processor.run(SocketServer.scala:247)
 at java.lang.Thread.run(Thread.java:745)
 
 
 
 -- 
 *Xingcan*


Re: postgresql consumer

2014-10-18 Thread Sa Li

Hi, all

I've just made a 3-node kafka cluster (9 brokers, 3 for each node), the 
performance test is OK. Now I am using tridentKafkaSpout, and being able to 
getting data from producer, see

 BrokerHosts zk = new ZkHosts(10.100.70.128:2181);   
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, topictest); 
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(spoutConf);
// TransactionalTridentKafkaSpout kafkaSpout = new 
TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology(); 
Stream stream = topology.newStream(topictestspout, kafkaSpout).shuffle()
.each(new 
Fields(str),  
  new 
PrintStream(),
  new 
Fields(event_object)) 

.parallelismHint(16); 


With above code, I can print out the json objects published to brokers. Instead 
of printing messages, I will like to simply populate the messages into 
postgresql DB. I download the code from 
https://github.com/geoforce/storm-postgresql

Here the problems I have:
1. When I am running the storm-postgresql code, the messages generated from a 
RandomTupleSpout(), I am only able to write data into postgresql DB 100 rows 
regardless how I change the PostgresqlStateConfig. 

2. Now I want to be able to write the json messages into postgresql DB, things 
seem to be simple, just 2 columns in the DB table, id and events which stores 
json messages. Forgive my dullness, I couldn't get it work by storm-postgresql. 

I wonder if anyone has done the similar jobs, getting data from 
tridentKafkaSpout and write exactly into postgresql DB. In addition, once the 
writer starts to work, if it stops and restarts for some reasons, and I will to 
writer to resume the consume process from the stop point instead of very 
beginning, how to manage the offset and restart to write into DB?


thanks

Alec 

 Hi, All
 
 I setup a kafka cluster, and plan to publish the messages from Web to kafka, 
 the messages are in the form of json, I want to implement a consumer to write 
 the message I consumer to postgresql DB, not aggregation at all. I was 
 thinking to use KafkaSpout in storm to make it happen, now I want to simplify 
 the step, just use kafka consumer to populate message into postgresql. This 
 consumer should have the functions of consumer data, write into postgresql DB 
 in batch, if servers down, consumer can retrieve the data it stored in hard 
 drive with no redundancy and can consume the data from where it stopped once 
 the server up.
 
 Is there any sample code for this?
 
 
 thanks a lot
 
 
 Alec