Hi, All,
Continue above topic, I generate a stream from kafka, and PostgresqlState
is configured as
final PostgresqlStateConfig config = new PostgresqlStateConfig();
{
config.setUrl(dburl);
config.setTable("test.state");
config.setKeyColumns(new String[]{"id"});
config.setValueColumns(new String[]{"word"});
config.setType(StateType.NON_TRANSACTIONAL);
config.setCacheSize(1000);
}
I want to use persistentAggregate, something like
stream.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("id", "word"), new Equals(), new Fields("idKey", "words"));
Equals will pass the fields of "id", "word" and doing nothing, but will
multi-put to database, my question is , Equals() is the right function to
use?
thanks
Alec
On Thu, Oct 16, 2014 at 10:58 AM, Sa Li <[email protected]> wrote:
> Hi, all
>
> I've just made a 3-node kafka cluster (9 brokers, 3 for each node), the
> performance test is OK. Now I am using tridentKafkaSpout, and being able to
> getting data from producer, see
>
> BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest");
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout kafkaSpout = new
> OpaqueTridentKafkaSpout(spoutConf);
> // TransactionalTridentKafkaSpout kafkaSpout = new
> TransactionalTridentKafkaSpout(spoutConf);
> TridentTopology topology = new TridentTopology();
> Stream stream = topology.newStream("topictestspout", kafkaSpout).shuffle()
> .each(new
> Fields("str"),
> new
> PrintStream(),
> new
> Fields("event_object"))
>
> .parallelismHint(16);
>
>
> With above code, I can print out the json objects published to brokers.
> Instead of printing messages, I will like to simply populate the messages
> into postgresql DB. I download the code from
> https://github.com/geoforce/storm-postgresql
>
> Here the problems I have:
> 1. When I am running the storm-postgresql code, the messages generated
> from a RandomTupleSpout(), I am only able to write data into postgresql DB
> 100 rows regardless how I change the PostgresqlStateConfig.
>
> 2. Now I want to be able to write the json messages into postgresql DB,
> things seem to be simple, just 2 columns in the DB table, id and events
> which stores json messages. Forgive my dullness, I couldn't get it work by
> storm-postgresql.
>
> I wonder if anyone has done the similar jobs, getting data from
> tridentKafkaSpout and write exactly into postgresql DB. In addition, once
> the writer starts to work, if it stops and restarts for some reasons, and I
> will to writer to resume the consume process from the stop point instead of
> very beginning, how to manage the offset and restart to write into DB?
>
>
> thanks
>
> Alec
>
>
>
>
>