Hi, all
I've just made a 3-node kafka cluster (9 brokers, 3 for each node), the
performance test is OK. Now I am using tridentKafkaSpout, and being able to
getting data from producer, see
BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(spoutConf);
// TransactionalTridentKafkaSpout kafkaSpout = new
TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("topictestspout", kafkaSpout).shuffle()
.each(new
Fields("str"),
new
PrintStream(),
new
Fields("event_object"))
.parallelismHint(16);
With above code, I can print out the json objects published to brokers.
Instead of printing messages, I will like to simply populate the messages
into postgresql DB. I download the code from
https://github.com/geoforce/storm-postgresql
Here the problems I have:
1. When I am running the storm-postgresql code, the messages generated from
a RandomTupleSpout(), I am only able to write data into postgresql DB 100
rows regardless how I change the PostgresqlStateConfig.
2. Now I want to be able to write the json messages into postgresql DB,
things seem to be simple, just 2 columns in the DB table, id and events
which stores json messages. Forgive my dullness, I couldn't get it work by
storm-postgresql.
I wonder if anyone has done the similar jobs, getting data from
tridentKafkaSpout and write exactly into postgresql DB. In addition, once
the writer starts to work, if it stops and restarts for some reasons, and I
will to writer to resume the consume process from the stop point instead of
very beginning, how to manage the offset and restart to write into DB?
thanks
Alec