Hi, I find trident topology is  a elegant solution to this problem, suppose I 
have a spout emitting user logging information, here is the model

public class ActionLog implements Serializable{
    private static final long serialVersionUID = 22583958918591L;
    private String actionType;
    private String passPort;
    private String game;
    private Date sTime;    
}

if i want to caculate unique user of each game in real-time,here is my topology


inputStream.each(new Fields("actionLog"), new addNewFiled("user"), new 
Fields("user"))
                    .groupBy(new Fields("user"))
                    .aggregate(new One(), new Fields("one"))  //this step is 
equal to sql distinct
                    .persistentAggregate(new 
RedisState.SnapshottableStateFactory("distinctCountUser"), new Count(), new 
Fields("distinCount"))
                    .newValuesStream()
                    .each(new Fields("distinCount"), new justLogger());

there is an another example  
http://storm.incubator.apache.org/documentation/Trident-tutorial.html    Reach

The topology works on my local cluster, but I dont think very throughly, I 
wannt to put this implementation on production, so there are still lot work to 
be done, any suggestiong and ideas are truly welcome.
2014-07-18 



唐思成 



发件人: Sam Goodwin 
发送时间: 2014-07-17  05:45:03 
收件人: [email protected] 
抄送: 
主题: Re: How to implememt distinct count in trident topolgy? 
 
Even with Redis you'll need to maintain the sliding window yourself.


Does it need to be exact? If you want to estimate the number of distinct users 
seen in a sliding window then use the HyperLogLog data structure with a ring 
buffer. It's fast, accurate and memory efficient. For example, allocate 60 
HyperLogLog structure for 60 minutes (1 per minute) and then use a Ring Buffer 
algorithm to maintain the sliding window. When you want the total count you can 
just merge all the HyperLogLog structures and extract the count. It's not exact 
but it's close enough and can be tuned based on your precision requirements.

Twitter's alegebird package has a monoid implementation of the HLL algorithm 
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala#L353

 which basically means you can merge them into 1 and should be all you need. 
The Redis HLL also allows you to merge two HLLs. Please note that using the 
Redis HLL algorithm will make it harder to implement a transactional topology. 
If you ever want that then I suggest you implement the above algorithm and 
serialize/deserialize in your TridentState.


If you want a more precise window you can just increase the bucket counts. You 
may also be able to adapt this exponential histogram sliding window algorithm 
for your needs 
http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf



On Wed, Jul 16, 2014 at 1:21 PM, Danijel Schiavuzzi <[email protected]> 
wrote:

Take a look at a distributed data structure server, for example Redis. The are 
various Storm integrations available.

On Monday, July 14, 2014, 唐思成 <[email protected]> wrote:

Use case is simple, count unique user in for in a window slide, and I found the 
common solutions over the Internet is to use HashSet to fliter the duplicated 
user,like this 

public class Distinct extends BaseFilter {
    private static final long serialVersionUID = 1L;
    private Set<String> distincter = Collections.synchronizedSet(new 
HashSet<String>());
    @Override
    public boolean isKeep(TridentTuple tuple) {
        String id = this.getId(tuple);
        return distincter.add(id);
    }
    public String getId(TridentTuple t) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < t.size(); i++) {
            sb.append(t.getString(i));
        }
        return sb.toString();
    }
}

However, the HashSet is stored in memory, when the data grows to a very large 
level, I think it will cause a OOM.
So is there a scalable solution?

2014-07-14 



唐思成 


-- 
Danijel Schiavuzzi

E: [email protected]
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7

Reply via email to