Hi, I find trident topology is a elegant solution to this problem, suppose I
have a spout emitting user logging information, here is the model
public class ActionLog implements Serializable{
private static final long serialVersionUID = 22583958918591L;
private String actionType;
private String passPort;
private String game;
private Date sTime;
}
if i want to caculate unique user of each game in real-time,here is my topology
inputStream.each(new Fields("actionLog"), new addNewFiled("user"), new
Fields("user"))
.groupBy(new Fields("user"))
.aggregate(new One(), new Fields("one")) //this step is
equal to sql distinct
.persistentAggregate(new
RedisState.SnapshottableStateFactory("distinctCountUser"), new Count(), new
Fields("distinCount"))
.newValuesStream()
.each(new Fields("distinCount"), new justLogger());
there is an another example
http://storm.incubator.apache.org/documentation/Trident-tutorial.html Reach
The topology works on my local cluster, but I dont think very throughly, I
wannt to put this implementation on production, so there are still lot work to
be done, any suggestiong and ideas are truly welcome.
2014-07-18
唐思成
发件人: Sam Goodwin
发送时间: 2014-07-17 05:45:03
收件人: [email protected]
抄送:
主题: Re: How to implememt distinct count in trident topolgy?
Even with Redis you'll need to maintain the sliding window yourself.
Does it need to be exact? If you want to estimate the number of distinct users
seen in a sliding window then use the HyperLogLog data structure with a ring
buffer. It's fast, accurate and memory efficient. For example, allocate 60
HyperLogLog structure for 60 minutes (1 per minute) and then use a Ring Buffer
algorithm to maintain the sliding window. When you want the total count you can
just merge all the HyperLogLog structures and extract the count. It's not exact
but it's close enough and can be tuned based on your precision requirements.
Twitter's alegebird package has a monoid implementation of the HLL algorithm
https://github.com/twitter/algebird/blob/develop/algebird-core/src/main/scala/com/twitter/algebird/HyperLogLog.scala#L353
which basically means you can merge them into 1 and should be all you need.
The Redis HLL also allows you to merge two HLLs. Please note that using the
Redis HLL algorithm will make it harder to implement a transactional topology.
If you ever want that then I suggest you implement the above algorithm and
serialize/deserialize in your TridentState.
If you want a more precise window you can just increase the bucket counts. You
may also be able to adapt this exponential histogram sliding window algorithm
for your needs
http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf
On Wed, Jul 16, 2014 at 1:21 PM, Danijel Schiavuzzi <[email protected]>
wrote:
Take a look at a distributed data structure server, for example Redis. The are
various Storm integrations available.
On Monday, July 14, 2014, 唐思成 <[email protected]> wrote:
Use case is simple, count unique user in for in a window slide, and I found the
common solutions over the Internet is to use HashSet to fliter the duplicated
user,like this
public class Distinct extends BaseFilter {
private static final long serialVersionUID = 1L;
private Set<String> distincter = Collections.synchronizedSet(new
HashSet<String>());
@Override
public boolean isKeep(TridentTuple tuple) {
String id = this.getId(tuple);
return distincter.add(id);
}
public String getId(TridentTuple t) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < t.size(); i++) {
sb.append(t.getString(i));
}
return sb.toString();
}
}
However, the HashSet is stored in memory, when the data grows to a very large
level, I think it will cause a OOM.
So is there a scalable solution?
2014-07-14
唐思成
--
Danijel Schiavuzzi
E: [email protected]
W: www.schiavuzzi.com
T: +385989035562
Skype: danijels7