Hi, All,

Continue above topic, I generate a stream from kafka, and PostgresqlState
is configured as

final PostgresqlStateConfig config = new PostgresqlStateConfig();
{
   config.setUrl(dburl);
   config.setTable("test.state");
   config.setKeyColumns(new String[]{"id"});
   config.setValueColumns(new String[]{"word"});
   config.setType(StateType.NON_TRANSACTIONAL);
   config.setCacheSize(1000);
}

I want to use persistentAggregate, something like
   stream.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("id", "word"), new Equals(), new Fields("idKey", "words"));

Equals will pass the fields of "id", "word"  and doing nothing, but will
multi-put to database, my question is , Equals() is the right function to
use?


thanks

Alec


On Thu, Oct 16, 2014 at 10:58 AM, Sa Li <[email protected]> wrote:

> Hi, all
>
> I've just made a 3-node kafka cluster (9 brokers, 3 for each node), the
> performance test is OK. Now I am using tridentKafkaSpout, and being able to
> getting data from producer, see
>
>  BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "topictest");
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout kafkaSpout = new
> OpaqueTridentKafkaSpout(spoutConf);
> // TransactionalTridentKafkaSpout kafkaSpout = new
> TransactionalTridentKafkaSpout(spoutConf);
> TridentTopology topology = new TridentTopology();
> Stream stream = topology.newStream("topictestspout", kafkaSpout).shuffle()
>                                                             .each(new
> Fields("str"),
>                                                                       new
> PrintStream(),
>                                                                       new
> Fields("event_object"))
>
> .parallelismHint(16);
>
>
> With above code, I can print out the json objects published to brokers.
> Instead of printing messages, I will like to simply populate the messages
> into postgresql DB. I download the code from
> https://github.com/geoforce/storm-postgresql
>
> Here the problems I have:
> 1. When I am running the storm-postgresql code, the messages generated
> from a RandomTupleSpout(), I am only able to write data into postgresql DB
> 100 rows regardless how I change the PostgresqlStateConfig.
>
> 2. Now I want to be able to write the json messages into postgresql DB,
> things seem to be simple, just 2 columns in the DB table, id and events
> which stores json messages. Forgive my dullness, I couldn't get it work by
> storm-postgresql.
>
> I wonder if anyone has done the similar jobs, getting data from
> tridentKafkaSpout and write exactly into postgresql DB. In addition, once
> the writer starts to work, if it stops and restarts for some reasons, and I
> will to writer to resume the consume process from the stop point instead of
> very beginning, how to manage the offset and restart to write into DB?
>
>
> thanks
>
> Alec
>
>
>
>
>

Reply via email to