Here I found if I use a RandomTupleSpout which implements IBatchSpout to
replace OpaqueTridentKafkaSpout,
static class RandomTupleSpout implements IBatchSpout {
private transient Random random;
private static final int BATCH = 1000;
@Override
@SuppressWarnings("rawtypes")
public void open(final Map conf, final TopologyContext context)
{
random = new Random();
}
@Override
public void emitBatch(final long batchId, final
TridentCollector collector) {
// emit a 3 number tuple (a,b,c)
for (int i = 0; i < BATCH; i++) {
collector.emit(new Values(i, "test string
inserted into this table"));
}
}
@Override
public void ack(final long batchId) {}
@Override
public void close() {}
@Override
@SuppressWarnings("rawtypes")
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields("batchid", "word");
}
}
I can detect batch tuples in TridentState, but with
OpaqueTridentKafkaSpout, seems I only receive one 1 row of tuple.
thanks
Alec
On Fri, Dec 12, 2014 at 10:47 AM, Sa Li <[email protected]> wrote:
> Hi, Taylor
>
> Thank you very much, I actually read your book - storm blueprints:
> patterns for distributed real-time computation, awesome book. I am not a
> java programmer, but I try my best to program storm code in java, I
> actually implemented a trident state based on
> https://github.com/geoforce/storm-postgresql. I walk through the
> Trident-state link you mentioned, thought I am understanding the state bit.
>
> Here this is the updater function:
> static class EventUpdater implements ReducerAggregator<List<String>> {
> @Override
> public List<String> init(){
> return null;
> }
> @Override
> public List<String> reduce(List<String> curr, TridentTuple
> tuple) {
> List<String> updated = null ;
> if ( curr == null ) {
> String event = (String)
> tuple.getValue(1);
> updated = Lists.newArrayList(event);
> } else {
> updated = curr ;
> }
> System.out.println(updated);
> return updated ;
> }
> }
>
> In the State, I am doing such
>
> PostgresqlState(final PostgresqlStateConfig config) {
> this.config = config;
> try {
> Class.forName("org.postgresql.Driver");
> connection = DriverManager.getConnection(config.getUrl(),
> "sali", "sali");
> } catch ( ClassNotFoundException e) {
> logger.error("Failed to establish DB connection ", e);
> System.exit(1);
> } catch (SQLException e) {
> logger.error("Failed to establish DB connection", e);
> System.exit(2);
> }
>
> }
> public static Factory newFactory(final PostgresqlStateConfig config) {
> return new Factory(config);
> }
>
> // I don't do anything in multiget, since I have to compare the keys in
> DB or retrieve anything from DB
> @Override
> public List<T> multiGet(final List<List<Object>> keys) {
> final List<T> result = new ArrayList();
> for (final List<Object> key : keys) { result.add((T)
> null) ; }
> return result;
> }
>
> Next, I wrote two multiput to store the kafka messages, one by copy
> command, another by multi-insert.
> // copy command
> @Override
> public void multiPut(final List<List<Object>> keys, final List<T> values)
> {
> System.out.println("multiPut start ..... ");
> System.out.println(keys); // I want to print out
> the batch keys
>
> final Iterator<List<List<Object>>> partitionedKeys =
> Lists.partition(keys, config.getBatchSize()).iterator();
> final Iterator<List<T>> partitionedValues =
> Lists.partition(values, config.getBatchSize()).iterator();
> while (partitionedKeys.hasNext() &&
> partitionedValues.hasNext()) {
> final List<List<Object>>
> pkeys = partitionedKeys.next();
> final List<T> pvalues =
> partitionedValues.next();
>
> final StringBuilder
> copyQueryBuilder = new StringBuilder()
>
> .append("COPY ")
>
> .append(config.getTable())
>
> .append("(")
>
> .append(buildColumns())
>
> .append(") FROM STDIN WITH DELIMITER '|'");
>
> final List<String>
> combineRowList = new ArrayList() ;
> byte[] combineRowBytes ;
> for (int i = 0; i <
> pkeys.size(); i++) {
>
> for (int j = 0; j < pkeys.get(i).size() ; j++) {
>
> String row = new String(pkeys.get(i).get(j) +
> "|" + pvalues.get(i) + "\n") ;
>
> combineRowList.add(row);
> }
> }
>
> ByteArrayOutputStream baos =
> new ByteArrayOutputStream() ;
> DataOutputStream out = new
> DataOutputStream(baos) ;
> for (String element:
> combineRowList) {
> try{
>
> out.write(element.getBytes()) ;
> } catch
> (Exception ex) {
>
> logger.error("Problem in createByteArray", ex);
> }
> }
> combineRowBytes =
> baos.toByteArray();
> try {
>
> CopyManager cm = new CopyManager((BaseConnection) conn);
> CopyIn
> cpIN=null;
>
> cpIN=cm.copyIn(copyQueryBuilder.toString());
>
> cpIN.writeToCopy(combineRowBytes,0,combineRowBytes.length);
>
> cpIN.endCopy();
> } catch (final SQLException
> e) {
>
> logger.error("Multiput update failed", e);
> } finally {
> try {
>
> if (conn!=null)
>
> conn.close(); }
> catch
> (SQLException e) {
>
> logger.error("Multiput update failed", e);
> }
> }
>
> logger.debug(String.format("%1$d keys flushed",
> pkeys.size()));
> }
> }
>
> // another multiput
> @Override
> public void multiPut(final List<List<Object>> keys, final List<T> values)
> {
> System.out.println("multiPut start ..... ");
> System.out.println(keys); // I want to print out
> the batch keys
>
> final Iterator<List<List<Object>>> partitionedKeys =
> Lists.partition(keys, config.getBatchSize()).iterator();
> final Iterator<List<T>> partitionedValues =
> Lists.partition(values, config.getBatchSize()).iterator();
> while (partitionedKeys.hasNext() &&
> partitionedValues.hasNext()) {
> final List<List<Object>>
> pkeys = partitionedKeys.next();
> final List<T> pvalues =
> partitionedValues.next();
> int paramCount = 0;
> switch (config.getType()) {
> case OPAQUE:
>
> paramCount += config.getValueColumns().length;
> case
> TRANSACTIONAL:
>
> paramCount += 1;
> default:
>
> paramCount += (config.getKeyColumns().length +
> config.getValueColumns().length);
> }
> final StringBuilder
> queryBuilder = new StringBuilder()
>
> .append("WITH ")
>
> .append(" new_values (")
>
> .append(buildColumns())
>
> .append(") AS (")
>
> .append("VALUES ")
>
> .append(Joiner.on(", ").join(repeat("(" +
> Joiner.on(",").join(repeat("?", paramCount)) + ")", pkeys.size())))
>
> .append(")")
>
> .append("INSERT INTO ").append(config.getTable())
>
> .append("(").append(buildColumns()).append(") ")
>
> .append("SELECT ").append(buildColumns()).append(" ")
>
> .append("FROM new_values ") ;
>
> final List<Object> params =
> flattenPutParams(pkeys, pvalues);
> PreparedStatement ps = null;
> int i = 0;
> try {
> ps =
> conn.prepareStatement(queryBuilder.toString());
> for (final
> Object param : params) {
>
> ps.setObject(++i, param);
> }
> ps.execute();
> } catch (final SQLException
> e) {
>
> logger.error("Multiput update failed", e);
> } finally {
> if (ps !=
> null) {
>
> try {
>
> ps.close();
>
> } catch (SQLException e) {
>
> }
> }
> }
>
> logger.debug(String.format("%1$d keys flushed", pkeys.size()));
> }
> }
>
>
> This is the topology:
> topology.newStream("topictestspout",
> kafkaSpout)
>
> .each(new Fields("str"), new JsonObjectParse(), new
> Fields("userid","event"))
>
> .groupBy(new Fields("userid", "event"))
>
> .persistentAggregate(PostgresqlState.newFactory(config), new
> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
>
>
>
> Running above code, I can write into DB, but feel like only be able to
> write the first row of each batch (could be wrong), since I print the keys
> in the beginning of multiput, it suppose to be multiple elements List, but
> I only print out one element list, like [[1000]]. Sorry to send you such
> much code, reading could be tedious, hope you can help me to diagnose what
> is the problem.
>
>
> thanks
>
> Alec
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Wed, Dec 10, 2014 at 10:32 AM, P. Taylor Goetz <[email protected]>
> wrote:
>
>> Alec,
>>
>> For writing to a database in Trident, I would suggest implementing a
>> Trident State. That will allow you to process batches of tuples in bulk.
>> Trident functions operate on individual tuples, so writing to database in a
>> function will result in one insert for every tuple (inefficient).
>>
>> You can find more information on Trident State here:
>> https://storm.apache.org/documentation/Trident-state
>>
>> -Taylor
>>
>> On Dec 5, 2014, at 5:51 PM, Sa Li <[email protected]> wrote:
>>
>> Hi, Dmytro
>>
>> Thanks for the reply, I am using trident topology, here is my static
>> class, but I think this is incorrect since trident process data in batch,
>> but following code process single row.
>>
>> public static class WriteDB extends BaseFunction {
>> private Connection conn = null ;
>> PreparedStatement ps = null;
>>
>> @Override
>> public final void execute(final TridentTuple tuple, final
>> TridentCollector collector) {
>> int user = tuple.getInteger(0);
>> String value = tuple.getString(1);
>> final StringBuilder queryBuilder = new
>> StringBuilder()
>> .append("INSERT INTO test.state(userid,
>> event) VALUES(")
>> .append(user)
>> .append(", '")
>> .append(value)
>> .append("')");
>> System.out.println(queryBuilder.toString());
>> try {
>> ps =
>> conn.prepareStatement(queryBuilder.toString()) ;
>> ps.execute();
>> collector.emit(new
>> Values(tuple.getStringByField("event")));
>> }
>> catch (SQLException ex) {
>> System.err.println("Caught IOException: " +
>> ex.getMessage());
>> } finally {
>> if (ps != null) {
>> try {
>>
>> ps.close();
>> } catch
>> (SQLException ex) {
>> }
>> }
>> }
>> }
>> }
>>
>>
>> Any idea?
>>
>> thanks
>>
>> On Fri, Dec 5, 2014 at 2:35 PM, Dima Dragan <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> I think better approach is creating static class with thread-safe
>>> singleton initialization of connection pool (for example, I use HikariCP
>>> for SQL db and JedisPool for Redis) and provide public access for getting
>>> connection from it to bolt. Init pool in prepare method, take
>>> connection,execute smth, give it back in execute method.
>>>
>>> So every worker will get it's own pool which is shared between bolts.
>>>
>>> Best regards,
>>> Dmytro Dragan
>>> On Dec 5, 2014 11:57 PM, "Sa Li" <[email protected]> wrote:
>>>
>>>> Hi, all
>>>>
>>>> Right now I am able to write tuples into a file, and use copy command
>>>> to load into database, but this is obviously not a perfect solution,
>>>> increase complexity and overheads. Any idea to make it simpler?
>>>>
>>>>
>>>> thanks
>>>>
>>>>
>>>> Alec
>>>>
>>>
>>
>>
>