thanks for your response. But i am taking input from the spout itself and the two bolts contains different logic so i cannot emit tuple from the first bolt.
On Mon, Feb 1, 2016 at 11:17 PM, Nathan Leung <[email protected]> wrote: > You should wire the bolts one after the other, and the first will emit the > tuple to the second only when it has to. Don't use sleep, that's probably > not correct anyways. > On Jan 31, 2016 11:22 PM, "sujitha chinnu" <[email protected]> > wrote: > >> hai., >> My requirement is to first execute one bolt and upon successful >> execution only next bolt have to execute and i am giving the input for the >> bolts from same spout.For that for second bolt i am using Thread.sleep() >> method, its working fine but have performance issues.Can anyone help me if >> there is any alternative for this problem. >> >> Here is my sample code: >> >> Topology: >> >> public class Topology { >> >> ConnectionProvider cp; >> protected static final String JDBC_CONF = "jdbc.conf"; >> protected static final String TABLE_NAME = "users"; >> >> public static void main(String[] args) throws Exception{ >> String argument = args[0]; >> JdbcMapper jdbcMapper; >> TopologyBuilder builder = new TopologyBuilder(); >> Map map = Maps.newHashMap(); >> map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource"); >> >> map.put("dataSource.url","jdbc:postgresql://localhost:5432/analysis?user=postgres"); >> ConnectionProvider cp = new MyConnectionProvider(map); >> >> jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp); >> >> List<Column> schemaColumns = Lists.newArrayList(new Column("user_id", >> Types.INTEGER), new Column ("user_name",Types.VARCHAR),new >> Column("create_date", Types.TIMESTAMP)); >> >> JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns); >> >> PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper) >> .withInsertQuery("insert into user_details (id, user_name, >> created_timestamp) values (?,?,?)"); >> >> builder.setSpout("myspout", new UserSpout(), 1); >> >> builder.setBolt("Psql_Bolt", >> userPersistanceBolt,1).shuffleGrouping("myspout"); >> >> jdbcMapper = new SimpleJdbcMapper("My_details", cp); >> >> List<Column> schemaColumns1 = Lists.newArrayList(new Column("my_id", >> Types.INTEGER), new Column ("my_name",Types.VARCHAR)); >> >> JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1); >> >> PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1) >> .withInsertQuery("insert into My_details (my_id, my_name) values (?,?)"); >> >> //builder.setSpout("myspout", new UserSpout(), 1); >> >> builder.setBolt("Psql_Bolt1", >> userPersistanceBolt1,1).shuffleGrouping("myspout"); >> Config conf = new Config(); >> conf.put(JDBC_CONF, map); >> conf.setDebug(true); >> conf.setNumWorkers(3); >> >> if (argument.equalsIgnoreCase("runLocally")) >> { System.out.println("Running topology locally..."); LocalCluster cluster >> = new LocalCluster(); cluster.submitTopology("Twitter Test >> Storm-postgresql", conf, builder.createTopology()); } >> >> else >> { System.out.println("Running topology on cluster..."); >> StormSubmitter.submitTopology("Topology_psql", conf, >> builder.createTopology()); } >> >> }} >> >> PsqlBolt: >> >> public class PsqlBolt extends AbstractJdbcBolt { >> private static final Logger LOG = Logger.getLogger(PsqlBolt.class); >> private String tableName; >> private String insertQuery; >> private JdbcMapper jdbcMapper; >> >> public PsqlBolt(ConnectionProvider connectionProvider, JdbcMapper >> jdbcMapper) >> { super(connectionProvider); this.jdbcMapper = jdbcMapper; } >> public PsqlBolt withTableName(String tableName) { this.tableName = >> tableName; return this; } >> >> public PsqlBolt withInsertQuery(String insertQuery) { this.insertQuery = >> insertQuery; System.out.println("query passsed....."); return this; } >> @Override >> public void prepare(Map map, TopologyContext topologyContext, >> OutputCollector collector) { >> super.prepare(map, topologyContext, collector); >> if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) { >> throw new IllegalArgumentException("You must supply either a tableName or >> an insert Query."); } >> } >> >> @Override >> public void execute(Tuple tuple) { >> try { >> >> List<Column> columns = jdbcMapper.getColumns(tuple); >> List<List<Column>> columnLists = new ArrayList<List<Column>>(); >> columnLists.add(columns); >> if(!StringUtils.isBlank(tableName)) { >> this.jdbcClient.insert(this.tableName, columnLists); } else { >> this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); } >> this.collector.ack(tuple); >> } catch (Exception e) { this.collector.reportError(e); >> this.collector.fail(tuple); } >> } >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer >> outputFieldsDeclarer) { >> >> }} >> >> >> PsqlBolt1: >> >> public class PsqlBolt1 extends AbstractJdbcBolt { >> private static final Logger LOG = Logger.getLogger(PsqlBolt1.class); >> >> private String tableName; >> private String insertQuery; >> private JdbcMapper jdbcMapper; >> >> public PsqlBolt1(ConnectionProvider connectionProvider, JdbcMapper >> jdbcMapper) { super(connectionProvider); this.jdbcMapper = jdbcMapper; } >> >> public PsqlBolt1 withInsertQuery(String insertQuery) >> { this.insertQuery = insertQuery; System.out.println("query >> passsed....."); return this; } >> >> @Override >> public void prepare(Map map, TopologyContext topologyContext, >> OutputCollector collector) { >> super.prepare(map, topologyContext, collector); >> if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) >> { throw new IllegalArgumentException("You must supply either a tableName >> or an insert Query."); } >> >> } >> >> @Override >> public void execute(Tuple tuple) { >> try { >> Thread.sleep(1000); >> List<Column> columns = jdbcMapper.getColumns(tuple); >> List<List<Column>> columnLists = new ArrayList<List<Column>>(); >> columnLists.add(columns); >> if(!StringUtils.isBlank(tableName)) >> { this.jdbcClient.insert(this.tableName, columnLists); } >> >> else >> { this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); } >> >> this.collector.ack(tuple); >> } catch (Exception e) >> { this.collector.reportError(e); this.collector.fail(tuple); } >> >> } >> >> @Override >> public void declareOutputFields(OutputFieldsDeclarer >> outputFieldsDeclarer) { >> >> }} >> >
