Hi, all
 

I try to implement a trident Spout to read data from DB, 


             @SuppressWarnings("serial")
             public static class PostgresDBSpout implements IBatchSpout {

                              private static final long serialVersionUID = 1L;
                              SpoutOutputCollector _collector;
                              boolean _isDistributed;

                              public PostgresDBSpout(boolean isDistributed) {
                                     _isDistributed = isDistributed;
                              }

                              public boolean isDistributed() {
                                     return true;
                              }

                              @Override
                              public void emitBatch(final long batchId, final 
TridentCollector collector) {
                                           _collector = collector ;
                                           Connection conn = null ;
                                           LinkedList listOfMeasurement = new 
LinkedList() ;
                                           conn  = connectToDatabaseOrDie();
                                           fetchMeasurementSample(conn, 
listOfMeasurement);

                                           Iterator it = 
listOfMeasurement.iterator();
                                           while (it.hasNext())
                                           {
                                                 Measurement measurement = 
(Measurement)it.next();
                                                 System.out.println("id: " + 
measurement.id);
                                                 _collector.emit(new 
Values(measurement.sensor_id, measurement.period, measurement.powermon_num, 
measurement.current,
                                                                            
measurement.id, measurement.measurement_timestamp, 
measurement.measurement_dateuploaded));
                                            }
                              }

                              @Override
                              @SuppressWarnings("rawtypes")
                              public void open(final Map conf, final 
TopologyContext context) {
                              }

                              private void fetchMeasurementSample(Connection 
conn, LinkedList listOfMeasurement)
                              {
                                       try
                                       {
                                             Statement st = 
conn.createStatement();
                                             ResultSet rs = 
st.executeQuery("SELECT * FROM stream_producer.measurements_sample ORDER BY 
date_uploaded asc");
                                             while ( rs.next() )
                                             {
                                                   Measurement measurement = 
new Measurement();
                                                   measurement.sensor_id        
           =  rs.getString("sensor_id");
                                                   measurement.period           
           =  rs.getInt("period");
                                                   measurement.powermon_num     
           =  rs.getLong("powermon_num") ;
                                                   measurement.current          
           =  Float.parseFloat(rs.getString("current")) ;
                                                   measurement.id               
           =  rs.getString("id") ;
                                                   
measurement.measurement_timestamp       =  
rs.getString("measurement_timestamp") ;
                                                   
measurement.measurement_dateuploaded    =  
rs.getString("measurement_dateuploaded") ;
                                                   
listOfMeasurement.add(measurement);
                                             }
                                             rs.close();
                                             st.close();
                                       }
                                       catch (SQLException se) {
                                                   logger.error("Threw a 
SQLException creating the list of blogs.");
                                                   
logger.error(se.getMessage());
                                       }
                              }

                              private Connection connectToDatabaseOrDie() {
                                       Connection conn = null;
                                       try {
                                                
Class.forName("org.postgresql.Driver");
                                                String url = 
"jdbc:postgresql://10.43.34.144:5432/stream_producer";
                                                conn = 
DriverManager.getConnection(url, "stream", "stream");
                                                logger.info("DB connected 
.....");
                                        } catch (ClassNotFoundException e) {
                                                logger.error("Failed to 
establish DB connection - 1", e);
                                                System.exit(1);
                                        } catch (SQLException e) {
                                                logger.error("Failed to 
establish DB connection - 2", e);
                                                System.exit(2);
                                        }
                                        return conn;
                              }

                              @Override
                              public void close() {
                              }
                              @Override
                              public void ack(final long batchId) {
                              }

                              @Override
                              public void fail(Object id) {
                              }  

                              @Override
                              public void 
declareOutputFields(OutputFieldsDeclarer declarer) {
                                                declarer.declare(new 
Fields("sensor_id", "period", "powermon_num", "current", "id", 
"measurement_timestamp", "measurement_dateuploaded"));
                              }


                              public void activate() {
                                // TODO Auto-generated method stub
                              }

                              public void deactivate() {
                                // TODO Auto-generated method stub
                              }

                              public Map<String, Object> 
getComponentConfiguration() {
                                // TODO Auto-generated method stub
                                        return null;
                              }
             }

Then I can open a trident topology like this 
 topology.newStream("spoutInit", new PostgresDBSpout())
                                           .parallelismHint(6)
                                           .groupBy(new Fields("sensor_id"))
                                           .each(new 
Fields("sensor_id","measurement_timestamp"),
                                                 new PrintStream(),
                                                 new 
Fields("sensorid","timestamps")) ;
 

Just can’t make it work, anyone can tell me what is wrong with this code?

thanks

AL

Reply via email to