Hi,

you probably want a cursor on your query that reads your table and
emits a tuple for each row and a mechanism to determine that the spout
is done.
Here you can find a mongodb spout that does the thing you may want:
https://github.com/stormprocessor/storm-mongo/blob/master/src/main/java/storm/mongo/MongoSpout.java

regards,
Barrie

2014-03-12 12:55 GMT+01:00 Ramprasad Ranganathan <[email protected]>:
> Yes nextTuple called every 1 ms by default. Then how do i read a row only
> one time. Because If it reads several times duplicate tuples will create and
> send to bolts for processing.
>
>
> On Wed, Mar 12, 2014 at 5:14 PM, Nathan Leung <[email protected]> wrote:
>>
>> nextTuple is called continuously by the spout so you are reading your
>> table several times
>>
>>
>> On Wed, Mar 12, 2014 at 6:59 AM, Ramprasad Ranganathan <[email protected]>
>> wrote:
>>>
>>> In my spout I read data from a postgres table and emit each row test_data
>>> field as a tuple to bolt. My table contains only 19 rows,so the spout should
>>> emit only 19 tuples to bolt. But in storm UI it shows 180 tuples emitted
>>> from spouts. How this is happened?
>>>
>>> public void nextTuple() {
>>>         Connection c = null;
>>>            Statement stmt = null;
>>>            try {
>>>                  Class.forName("org.postgresql.Driver");
>>>                  c =
>>> DriverManager.getConnection("jdbc:postgresql://localhost:5432/my_db",
>>> "postgres", "password");
>>>                  c.setAutoCommit(false);
>>>                  System.out.println("Opened database successfully");
>>>
>>>                  stmt = c.createStatement();
>>>                  ResultSet rs = stmt.executeQuery("SELECT test_data FROM
>>> my_schema.load_test_table where process_status='waiting' ;");
>>>                  while ( rs.next() ) {
>>>                     String  geodata = rs.getString("test_data");
>>>
>>>                     _collector.emit(new Values(geodata));
>>>
>>>                  }
>>>                  rs.close();
>>>                  stmt.close();
>>>                  c.close();
>>>            } catch ( Exception e ) {
>>>              System.err.println( e.getClass().getName()+": "+
>>> e.getMessage() );
>>>              System.exit(0);
>>>            }
>>>            System.out.println("Operation done successfully");
>>> }
>>>
>>>
>>>
>>> --
>>> by
>>>
>>> R.RAMPRASAD
>>
>>
>
>
>
> --
> by
>
> R.RAMPRASAD

Reply via email to