Hi, all I try to implement a trident Spout to read data from DB,
@SuppressWarnings("serial")
public static class PostgresDBSpout implements IBatchSpout {
private static final long serialVersionUID = 1L;
SpoutOutputCollector _collector;
boolean _isDistributed;
public PostgresDBSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public boolean isDistributed() {
return true;
}
@Override
public void emitBatch(final long batchId, final
TridentCollector collector) {
_collector = collector ;
Connection conn = null ;
LinkedList listOfMeasurement = new
LinkedList() ;
conn = connectToDatabaseOrDie();
fetchMeasurementSample(conn,
listOfMeasurement);
Iterator it =
listOfMeasurement.iterator();
while (it.hasNext())
{
Measurement measurement =
(Measurement)it.next();
System.out.println("id: " +
measurement.id);
_collector.emit(new
Values(measurement.sensor_id, measurement.period, measurement.powermon_num,
measurement.current,
measurement.id, measurement.measurement_timestamp,
measurement.measurement_dateuploaded));
}
}
@Override
@SuppressWarnings("rawtypes")
public void open(final Map conf, final
TopologyContext context) {
}
private void fetchMeasurementSample(Connection
conn, LinkedList listOfMeasurement)
{
try
{
Statement st =
conn.createStatement();
ResultSet rs =
st.executeQuery("SELECT * FROM stream_producer.measurements_sample ORDER BY
date_uploaded asc");
while ( rs.next() )
{
Measurement measurement =
new Measurement();
measurement.sensor_id
= rs.getString("sensor_id");
measurement.period
= rs.getInt("period");
measurement.powermon_num
= rs.getLong("powermon_num") ;
measurement.current
= Float.parseFloat(rs.getString("current")) ;
measurement.id
= rs.getString("id") ;
measurement.measurement_timestamp =
rs.getString("measurement_timestamp") ;
measurement.measurement_dateuploaded =
rs.getString("measurement_dateuploaded") ;
listOfMeasurement.add(measurement);
}
rs.close();
st.close();
}
catch (SQLException se) {
logger.error("Threw a
SQLException creating the list of blogs.");
logger.error(se.getMessage());
}
}
private Connection connectToDatabaseOrDie() {
Connection conn = null;
try {
Class.forName("org.postgresql.Driver");
String url =
"jdbc:postgresql://10.43.34.144:5432/stream_producer";
conn =
DriverManager.getConnection(url, "stream", "stream");
logger.info("DB connected
.....");
} catch (ClassNotFoundException e) {
logger.error("Failed to
establish DB connection - 1", e);
System.exit(1);
} catch (SQLException e) {
logger.error("Failed to
establish DB connection - 2", e);
System.exit(2);
}
return conn;
}
@Override
public void close() {
}
@Override
public void ack(final long batchId) {
}
@Override
public void fail(Object id) {
}
@Override
public void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new
Fields("sensor_id", "period", "powermon_num", "current", "id",
"measurement_timestamp", "measurement_dateuploaded"));
}
public void activate() {
// TODO Auto-generated method stub
}
public void deactivate() {
// TODO Auto-generated method stub
}
public Map<String, Object>
getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
Then I can open a trident topology like this
topology.newStream("spoutInit", new PostgresDBSpout())
.parallelismHint(6)
.groupBy(new Fields("sensor_id"))
.each(new
Fields("sensor_id","measurement_timestamp"),
new PrintStream(),
new
Fields("sensorid","timestamps")) ;
Just can’t make it work, anyone can tell me what is wrong with this code?
thanks
AL
