Why can't you emit the data from the spout in the tuple to the second bolt? Your current strategy has very small chances of success when you scale to more than one machine (you would have to very carefully craft the topology to accomplish this), and sleeping to ensure data synchronization will lead to problems. This is easily solved by putting bolt 2 after bolt 1 in the topology.
If it's completely infeasible for whatever reason then you should merge the two bolts into one. On Feb 2, 2016 2:04 AM, "sujitha chinnu" <[email protected]> wrote: > thanks for your response. > But i am taking input from the spout itself and the two bolts contains > different logic so i cannot emit tuple from the first bolt. > > On Mon, Feb 1, 2016 at 11:17 PM, Nathan Leung <[email protected]> wrote: > >> You should wire the bolts one after the other, and the first will emit >> the tuple to the second only when it has to. Don't use sleep, that's >> probably not correct anyways. >> On Jan 31, 2016 11:22 PM, "sujitha chinnu" <[email protected]> >> wrote: >> >>> hai., >>> My requirement is to first execute one bolt and upon successful >>> execution only next bolt have to execute and i am giving the input for the >>> bolts from same spout.For that for second bolt i am using Thread.sleep() >>> method, its working fine but have performance issues.Can anyone help me if >>> there is any alternative for this problem. >>> >>> Here is my sample code: >>> >>> Topology: >>> >>> public class Topology { >>> >>> ConnectionProvider cp; >>> protected static final String JDBC_CONF = "jdbc.conf"; >>> protected static final String TABLE_NAME = "users"; >>> >>> public static void main(String[] args) throws Exception{ >>> String argument = args[0]; >>> JdbcMapper jdbcMapper; >>> TopologyBuilder builder = new TopologyBuilder(); >>> Map map = Maps.newHashMap(); >>> map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource"); >>> >>> map.put("dataSource.url","jdbc:postgresql://localhost:5432/analysis?user=postgres"); >>> ConnectionProvider cp = new MyConnectionProvider(map); >>> >>> jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp); >>> >>> List<Column> schemaColumns = Lists.newArrayList(new Column("user_id", >>> Types.INTEGER), new Column ("user_name",Types.VARCHAR),new >>> Column("create_date", Types.TIMESTAMP)); >>> >>> JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns); >>> >>> PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper) >>> .withInsertQuery("insert into user_details (id, user_name, >>> created_timestamp) values (?,?,?)"); >>> >>> builder.setSpout("myspout", new UserSpout(), 1); >>> >>> builder.setBolt("Psql_Bolt", >>> userPersistanceBolt,1).shuffleGrouping("myspout"); >>> >>> jdbcMapper = new SimpleJdbcMapper("My_details", cp); >>> >>> List<Column> schemaColumns1 = Lists.newArrayList(new Column("my_id", >>> Types.INTEGER), new Column ("my_name",Types.VARCHAR)); >>> >>> JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1); >>> >>> PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1) >>> .withInsertQuery("insert into My_details (my_id, my_name) values (?,?)"); >>> >>> //builder.setSpout("myspout", new UserSpout(), 1); >>> >>> builder.setBolt("Psql_Bolt1", >>> userPersistanceBolt1,1).shuffleGrouping("myspout"); >>> Config conf = new Config(); >>> conf.put(JDBC_CONF, map); >>> conf.setDebug(true); >>> conf.setNumWorkers(3); >>> >>> if (argument.equalsIgnoreCase("runLocally")) >>> { System.out.println("Running topology locally..."); LocalCluster >>> cluster = new LocalCluster(); cluster.submitTopology("Twitter Test >>> Storm-postgresql", conf, builder.createTopology()); } >>> >>> else >>> { System.out.println("Running topology on cluster..."); >>> StormSubmitter.submitTopology("Topology_psql", conf, >>> builder.createTopology()); } >>> >>> }} >>> >>> PsqlBolt: >>> >>> public class PsqlBolt extends AbstractJdbcBolt { >>> private static final Logger LOG = Logger.getLogger(PsqlBolt.class); >>> private String tableName; >>> private String insertQuery; >>> private JdbcMapper jdbcMapper; >>> >>> public PsqlBolt(ConnectionProvider connectionProvider, JdbcMapper >>> jdbcMapper) >>> { super(connectionProvider); this.jdbcMapper = jdbcMapper; } >>> public PsqlBolt withTableName(String tableName) { this.tableName = >>> tableName; return this; } >>> >>> public PsqlBolt withInsertQuery(String insertQuery) { this.insertQuery = >>> insertQuery; System.out.println("query passsed....."); return this; } >>> @Override >>> public void prepare(Map map, TopologyContext topologyContext, >>> OutputCollector collector) { >>> super.prepare(map, topologyContext, collector); >>> if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) { >>> throw new IllegalArgumentException("You must supply either a tableName or >>> an insert Query."); } >>> } >>> >>> @Override >>> public void execute(Tuple tuple) { >>> try { >>> >>> List<Column> columns = jdbcMapper.getColumns(tuple); >>> List<List<Column>> columnLists = new ArrayList<List<Column>>(); >>> columnLists.add(columns); >>> if(!StringUtils.isBlank(tableName)) { >>> this.jdbcClient.insert(this.tableName, columnLists); } else { >>> this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); } >>> this.collector.ack(tuple); >>> } catch (Exception e) { this.collector.reportError(e); >>> this.collector.fail(tuple); } >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer >>> outputFieldsDeclarer) { >>> >>> }} >>> >>> >>> PsqlBolt1: >>> >>> public class PsqlBolt1 extends AbstractJdbcBolt { >>> private static final Logger LOG = Logger.getLogger(PsqlBolt1.class); >>> >>> private String tableName; >>> private String insertQuery; >>> private JdbcMapper jdbcMapper; >>> >>> public PsqlBolt1(ConnectionProvider connectionProvider, JdbcMapper >>> jdbcMapper) { super(connectionProvider); this.jdbcMapper = jdbcMapper; } >>> >>> public PsqlBolt1 withInsertQuery(String insertQuery) >>> { this.insertQuery = insertQuery; System.out.println("query >>> passsed....."); return this; } >>> >>> @Override >>> public void prepare(Map map, TopologyContext topologyContext, >>> OutputCollector collector) { >>> super.prepare(map, topologyContext, collector); >>> if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) >>> { throw new IllegalArgumentException("You must supply either a tableName >>> or an insert Query."); } >>> >>> } >>> >>> @Override >>> public void execute(Tuple tuple) { >>> try { >>> Thread.sleep(1000); >>> List<Column> columns = jdbcMapper.getColumns(tuple); >>> List<List<Column>> columnLists = new ArrayList<List<Column>>(); >>> columnLists.add(columns); >>> if(!StringUtils.isBlank(tableName)) >>> { this.jdbcClient.insert(this.tableName, columnLists); } >>> >>> else >>> { this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); } >>> >>> this.collector.ack(tuple); >>> } catch (Exception e) >>> { this.collector.reportError(e); this.collector.fail(tuple); } >>> >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer >>> outputFieldsDeclarer) { >>> >>> }} >>> >> >
