I guess if I were you I might assign a monotonically increasing ID to the
rows and hold the offset somewhere and use that to not read the same row
twice. This is kinda similar to what Kafka does. It may be more straight
forward to stick the row into a kafka queue when you add it to the DB to
begin with, so that you don't have to code this part yourself.






On Wed, Mar 12, 2014 at 12:29 PM, B Kersbergen <[email protected]>wrote:

> Hi,
>
> you probably want a cursor on your query that reads your table and
> emits a tuple for each row and a mechanism to determine that the spout
> is done.
> Here you can find a mongodb spout that does the thing you may want:
>
> https://github.com/stormprocessor/storm-mongo/blob/master/src/main/java/storm/mongo/MongoSpout.java
>
> regards,
> Barrie
>
> 2014-03-12 12:55 GMT+01:00 Ramprasad Ranganathan <[email protected]>:
> > Yes nextTuple called every 1 ms by default. Then how do i read a row only
> > one time. Because If it reads several times duplicate tuples will create
> and
> > send to bolts for processing.
> >
> >
> > On Wed, Mar 12, 2014 at 5:14 PM, Nathan Leung <[email protected]> wrote:
> >>
> >> nextTuple is called continuously by the spout so you are reading your
> >> table several times
> >>
> >>
> >> On Wed, Mar 12, 2014 at 6:59 AM, Ramprasad Ranganathan <
> [email protected]>
> >> wrote:
> >>>
> >>> In my spout I read data from a postgres table and emit each row
> test_data
> >>> field as a tuple to bolt. My table contains only 19 rows,so the spout
> should
> >>> emit only 19 tuples to bolt. But in storm UI it shows 180 tuples
> emitted
> >>> from spouts. How this is happened?
> >>>
> >>> public void nextTuple() {
> >>>         Connection c = null;
> >>>            Statement stmt = null;
> >>>            try {
> >>>                  Class.forName("org.postgresql.Driver");
> >>>                  c =
> >>> DriverManager.getConnection("jdbc:postgresql://localhost:5432/my_db",
> >>> "postgres", "password");
> >>>                  c.setAutoCommit(false);
> >>>                  System.out.println("Opened database successfully");
> >>>
> >>>                  stmt = c.createStatement();
> >>>                  ResultSet rs = stmt.executeQuery("SELECT test_data
> FROM
> >>> my_schema.load_test_table where process_status='waiting' ;");
> >>>                  while ( rs.next() ) {
> >>>                     String  geodata = rs.getString("test_data");
> >>>
> >>>                     _collector.emit(new Values(geodata));
> >>>
> >>>                  }
> >>>                  rs.close();
> >>>                  stmt.close();
> >>>                  c.close();
> >>>            } catch ( Exception e ) {
> >>>              System.err.println( e.getClass().getName()+": "+
> >>> e.getMessage() );
> >>>              System.exit(0);
> >>>            }
> >>>            System.out.println("Operation done successfully");
> >>> }
> >>>
> >>>
> >>>
> >>> --
> >>> by
> >>>
> >>> R.RAMPRASAD
> >>
> >>
> >
> >
> >
> > --
> > by
> >
> > R.RAMPRASAD
>

Reply via email to