Here I found if I use a RandomTupleSpout which implements IBatchSpout to
replace OpaqueTridentKafkaSpout,

 static class RandomTupleSpout implements IBatchSpout {
            private transient Random random;
            private static final int BATCH = 1000;

            @Override
            @SuppressWarnings("rawtypes")
            public void open(final Map conf, final TopologyContext context)
{
            random = new Random();
        }

            @Override
            public void emitBatch(final long batchId, final
TridentCollector collector) {
                // emit a 3 number tuple (a,b,c)
                  for (int i = 0; i < BATCH; i++) {
                          collector.emit(new Values(i, "test string
inserted into this table"));
                  }
            }

            @Override
            public void ack(final long batchId) {}

            @Override
            public void close() {}

            @Override
            @SuppressWarnings("rawtypes")
            public Map getComponentConfiguration() {
                   return null;
            }

            @Override
            public Fields getOutputFields() {
                 return new Fields("batchid", "word");
            }
        }


I can detect batch tuples in TridentState, but with
OpaqueTridentKafkaSpout, seems I only receive one 1 row of tuple.

thanks

Alec

On Fri, Dec 12, 2014 at 10:47 AM, Sa Li <[email protected]> wrote:

> Hi, Taylor
>
> Thank you very much, I actually read your book - storm blueprints:
> patterns for distributed real-time computation, awesome book. I am not a
> java programmer, but I try my best to program storm code in java, I
> actually implemented a trident state based on
> https://github.com/geoforce/storm-postgresql. I walk through the
> Trident-state link you mentioned, thought I am understanding the state bit.
>
> Here this is the updater function:
>  static class EventUpdater implements ReducerAggregator<List<String>> {
>             @Override
>             public List<String> init(){
>                      return null;
>             }
>             @Override
>             public List<String> reduce(List<String> curr, TridentTuple
> tuple) {
>                    List<String> updated = null ;
>                    if ( curr == null ) {
>                                     String event = (String)
> tuple.getValue(1);
>                                     updated = Lists.newArrayList(event);
>                    } else {
>                                     updated = curr ;
>                    }
>               System.out.println(updated);
>               return updated ;
>             }
>         }
>
> In the State, I am doing such
>
> PostgresqlState(final PostgresqlStateConfig config) {
>         this.config = config;
>         try {
>             Class.forName("org.postgresql.Driver");
>             connection = DriverManager.getConnection(config.getUrl(),
> "sali", "sali");
>         } catch ( ClassNotFoundException e) {
>             logger.error("Failed to establish DB connection ", e);
>             System.exit(1);
>         } catch (SQLException e) {
>             logger.error("Failed to establish DB connection", e);
>             System.exit(2);
>         }
>
>     }
>     public static Factory newFactory(final PostgresqlStateConfig config) {
>         return new Factory(config);
>     }
>
>    // I don't do anything in multiget, since I have to compare the keys in
> DB or retrieve anything from DB
>   @Override
>    public List<T> multiGet(final List<List<Object>> keys) {
>                    final List<T> result = new ArrayList();
>                    for (final List<Object> key : keys) {   result.add((T)
> null) ; }
>                    return result;
>    }
>
>   Next, I wrote two multiput to store the kafka messages, one by copy
> command, another by multi-insert.
> //    copy command
>  @Override
>  public void multiPut(final List<List<Object>> keys, final List<T> values)
> {
>                      System.out.println("multiPut start ..... ");
>                      System.out.println(keys);   // I want to print out
> the batch keys
>
>                      final Iterator<List<List<Object>>> partitionedKeys =
> Lists.partition(keys, config.getBatchSize()).iterator();
>                      final Iterator<List<T>> partitionedValues =
> Lists.partition(values, config.getBatchSize()).iterator();
>                      while (partitionedKeys.hasNext() &&
> partitionedValues.hasNext()) {
>                                                final List<List<Object>>
> pkeys = partitionedKeys.next();
>                                                final List<T> pvalues =
> partitionedValues.next();
>
>                                                final StringBuilder
> copyQueryBuilder = new StringBuilder()
>
>                                 .append("COPY ")
>
>                                 .append(config.getTable())
>
>                                 .append("(")
>
>                                 .append(buildColumns())
>
>                                 .append(") FROM STDIN WITH DELIMITER '|'");
>
>                                                final List<String>
> combineRowList = new ArrayList() ;
>                                                byte[] combineRowBytes ;
>                                                for (int i = 0; i <
> pkeys.size(); i++)  {
>
>  for (int j = 0; j < pkeys.get(i).size() ; j++)   {
>
>                              String row = new String(pkeys.get(i).get(j) +
> "|" +  pvalues.get(i) + "\n") ;
>
>                               combineRowList.add(row);
>                                                                           }
>                                                }
>
>                                               ByteArrayOutputStream baos =
> new ByteArrayOutputStream() ;
>                                               DataOutputStream out = new
> DataOutputStream(baos) ;
>                                               for (String element:
> combineRowList) {
>                                                            try{
>
>  out.write(element.getBytes()) ;
>                                                            } catch
> (Exception ex)  {
>
>  logger.error("Problem in createByteArray", ex);
>                                                            }
>                                               }
>                                              combineRowBytes =
> baos.toByteArray();
>                                              try {
>
>  CopyManager cm = new CopyManager((BaseConnection) conn);
>                                                                    CopyIn
> cpIN=null;
>
>  cpIN=cm.copyIn(copyQueryBuilder.toString());
>
>  cpIN.writeToCopy(combineRowBytes,0,combineRowBytes.length);
>
> cpIN.endCopy();
>                                              } catch (final SQLException
> e) {
>
>  logger.error("Multiput update failed", e);
>                                              } finally {
>                                                                    try {
>
>   if (conn!=null)
>
>             conn.close();  }
>                                                                    catch
> (SQLException e) {
>
>             logger.error("Multiput update failed", e);
>                                                                    }
>                                              }
>
>                      logger.debug(String.format("%1$d keys flushed",
> pkeys.size()));
>              }
>     }
>
>  //  another multiput
>  @Override
>  public void multiPut(final List<List<Object>> keys, final List<T> values)
> {
>                      System.out.println("multiPut start ..... ");
>                      System.out.println(keys);   // I want to print out
> the batch keys
>
>                      final Iterator<List<List<Object>>> partitionedKeys =
> Lists.partition(keys, config.getBatchSize()).iterator();
>                      final Iterator<List<T>> partitionedValues =
> Lists.partition(values, config.getBatchSize()).iterator();
>                      while (partitionedKeys.hasNext() &&
> partitionedValues.hasNext()) {
>                                                final List<List<Object>>
> pkeys = partitionedKeys.next();
>                                                final List<T> pvalues =
> partitionedValues.next();
>                                                int paramCount = 0;
>                                                switch (config.getType()) {
>                                                              case OPAQUE:
>
>   paramCount += config.getValueColumns().length;
>                                                              case
> TRANSACTIONAL:
>
>   paramCount += 1;
>                                                              default:
>
>   paramCount += (config.getKeyColumns().length +
> config.getValueColumns().length);
>                                                 }
>                                                final StringBuilder
> queryBuilder = new StringBuilder()
>
>                     .append("WITH ")
>
>                     .append(" new_values (")
>
>                     .append(buildColumns())
>
>                     .append(") AS (")
>
>                     .append("VALUES ")
>
>                     .append(Joiner.on(", ").join(repeat("(" +
> Joiner.on(",").join(repeat("?", paramCount)) + ")", pkeys.size())))
>
>                     .append(")")
>
>                     .append("INSERT INTO ").append(config.getTable())
>
>                     .append("(").append(buildColumns()).append(") ")
>
>                     .append("SELECT ").append(buildColumns()).append(" ")
>
>                     .append("FROM new_values ") ;
>
>                                               final List<Object> params =
> flattenPutParams(pkeys, pvalues);
>                                               PreparedStatement ps = null;
>                                               int i = 0;
>                                               try {
>                                                               ps =
> conn.prepareStatement(queryBuilder.toString());
>                                                               for (final
> Object param : params) {
>
>                  ps.setObject(++i, param);
>                                                               }
>                                                              ps.execute();
>                                               } catch (final SQLException
> e) {
>
>  logger.error("Multiput update failed", e);
>                                               } finally {
>                                                              if (ps !=
> null) {
>
>        try {
>
>                       ps.close();
>
>        } catch (SQLException e) {
>
>        }
>                                                               }
>                                              }
>
> logger.debug(String.format("%1$d keys flushed", pkeys.size()));
>                     }
>             }
>
>
> This is the topology:
>                               topology.newStream("topictestspout",
> kafkaSpout)
>
>  .each(new Fields("str"), new JsonObjectParse(), new
> Fields("userid","event"))
>
>  .groupBy(new Fields("userid", "event"))
>
>  .persistentAggregate(PostgresqlState.newFactory(config), new
> Fields("userid","event"), new  EventUpdater(), new Fields( "eventword"));
>
>
>
> Running above code, I can write into DB, but feel like only be able to
> write the first row of each batch (could be wrong), since I print the keys
> in the beginning of multiput, it suppose to be multiple elements List, but
> I only print out one element list, like [[1000]]. Sorry to send you such
> much code, reading could be tedious, hope you can help me to diagnose what
> is the problem.
>
>
> thanks
>
> Alec
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Wed, Dec 10, 2014 at 10:32 AM, P. Taylor Goetz <[email protected]>
> wrote:
>
>> Alec,
>>
>> For writing to a database in Trident, I would suggest implementing a
>> Trident State. That will allow you to process batches of tuples in bulk.
>> Trident functions operate on individual tuples, so writing to database in a
>> function will result in one insert for every tuple (inefficient).
>>
>> You can find more information on Trident State here:
>> https://storm.apache.org/documentation/Trident-state
>>
>> -Taylor
>>
>> On Dec 5, 2014, at 5:51 PM, Sa Li <[email protected]> wrote:
>>
>> Hi, Dmytro
>>
>> Thanks for the reply, I am using trident topology, here is my static
>> class, but I think this is incorrect since trident process data in batch,
>> but following code process single row.
>>
>> public static class WriteDB extends BaseFunction {
>>       private Connection conn = null ;
>>       PreparedStatement ps = null;
>>
>>      @Override
>>      public final void execute(final TridentTuple tuple, final
>> TridentCollector collector) {
>>                       int user = tuple.getInteger(0);
>>                       String value = tuple.getString(1);
>>                       final StringBuilder queryBuilder = new
>> StringBuilder()
>>                                .append("INSERT INTO test.state(userid,
>> event) VALUES(")
>>                                .append(user)
>>                                .append(", '")
>>                                .append(value)
>>                                .append("')");
>>                      System.out.println(queryBuilder.toString());
>>                      try {
>>                               ps =
>> conn.prepareStatement(queryBuilder.toString()) ;
>>                               ps.execute();
>>                               collector.emit(new
>> Values(tuple.getStringByField("event")));
>>                      }
>>                      catch (SQLException ex) {
>>                               System.err.println("Caught IOException: " +
>> ex.getMessage());
>>                      }      finally {
>>                                             if (ps != null) {
>>                                                              try {
>>
>> ps.close();
>>                                                              } catch
>> (SQLException ex) {
>>                                                              }
>>                                             }
>>                      }
>>         }
>>  }
>>
>>
>> Any idea?
>>
>> thanks
>>
>> On Fri, Dec 5, 2014 at 2:35 PM, Dima Dragan <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> I think better approach is creating static class with thread-safe
>>> singleton initialization of connection pool (for example, I use HikariCP
>>> for SQL db and JedisPool for Redis) and provide public access for getting
>>> connection from it to bolt. Init pool in prepare method, take
>>> connection,execute smth, give it back in execute method.
>>>
>>> So every worker will get it's own pool which is shared between bolts.
>>>
>>> Best regards,
>>> Dmytro Dragan
>>> On Dec 5, 2014 11:57 PM, "Sa Li" <[email protected]> wrote:
>>>
>>>> Hi, all
>>>>
>>>> Right now I am able to write tuples into a file, and use copy command
>>>> to load into database, but this is obviously not a perfect solution,
>>>> increase complexity and overheads. Any idea to make it simpler?
>>>>
>>>>
>>>> thanks
>>>>
>>>>
>>>> Alec
>>>>
>>>
>>
>>
>

Reply via email to