feed queue fetcher with hadoop/zookeeper/gearman?

2010-04-12 Thread Thomas Koch
Hi,

I'd like to implement a feed loader with Hadoop and most likely HBase. I've 
got around 1 million feeds, that should be loaded and checked for new entries. 
However the feeds have different priorities based on their average update 
frequency in the past and their relevance.
The feeds (url, last_fetched timestamp, priority) are stored in HBase. How 
could I implement the fetch queue for the loaders?

- An hourly map-reduce job to produce new queues for each node and save them 
on the nodes?
  - but how to know, which feeds have been fetched in the last hour?
  - what to do, if a fetch node dies?

- Store a fetch queue in zookeeper and add to the queue with map-reduce each 
hour?
  - Isn't that too much load for zookeeper? (I could make one znode for a 
bunch of urls...?)

- Use gearman to store the fetch queue?
  - But the gearman job server still seems to be a SPOF

[1] http://gearman.org

Thank you!

Thomas Koch, http://www.koch.ro


Re: feed queue fetcher with hadoop/zookeeper/gearman?

2010-04-12 Thread Mahadev Konar
Hi Thomas,
  There are a couple of projects inside Yahoo! that use ZooKeeper as an
event manager for feed processing.
  
I am little bit unclear on your example below. As I understand it-

1. There are 1 million feeds that will be stored in Hbase.
2. A map reduce job will be run on these feeds to find out which feeds need
to be fetched. 
3. This will create queues in ZooKeeper to fetch the feeds
4.  Workers will pull items from this queue and process feeds

Did I understand it correctly? Also, if above is the case, how many queue
items would you anticipate be accumulated every hour?

Thanks
mahadev


On 4/12/10 1:21 AM, Thomas Koch tho...@koch.ro wrote:

 Hi,
 
 I'd like to implement a feed loader with Hadoop and most likely HBase. I've
 got around 1 million feeds, that should be loaded and checked for new entries.
 However the feeds have different priorities based on their average update
 frequency in the past and their relevance.
 The feeds (url, last_fetched timestamp, priority) are stored in HBase. How
 could I implement the fetch queue for the loaders?
 
 - An hourly map-reduce job to produce new queues for each node and save them
 on the nodes?
   - but how to know, which feeds have been fetched in the last hour?
   - what to do, if a fetch node dies?
 
 - Store a fetch queue in zookeeper and add to the queue with map-reduce each
 hour?
   - Isn't that too much load for zookeeper? (I could make one znode for a
 bunch of urls...?)
 
 - Use gearman to store the fetch queue?
   - But the gearman job server still seems to be a SPOF
 
 [1] http://gearman.org
 
 Thank you!
 
 Thomas Koch, http://www.koch.ro
 



Re: feed queue fetcher with hadoop/zookeeper/gearman?

2010-04-12 Thread Thomas Koch
Mahadev Konar:
 Hi Thomas,
   There are a couple of projects inside Yahoo! that use ZooKeeper as an
 event manager for feed processing.
 
 I am little bit unclear on your example below. As I understand it-
 
 1. There are 1 million feeds that will be stored in Hbase.
 2. A map reduce job will be run on these feeds to find out which feeds need
 to be fetched.
 3. This will create queues in ZooKeeper to fetch the feeds
 4.  Workers will pull items from this queue and process feeds
 
 Did I understand it correctly? Also, if above is the case, how many queue
 items would you anticipate be accumulated every hour?
Yes. That's exactly what I'm thinking about. Currently one node processes like 
2 Feeds an hour and we have 5 feed-fetch-nodes. This would mean ~10 
queue items/hour. Each queue item should carry some meta informations, most 
important the feed items, that are already known to the system so that only 
new items get processed.

Thomas Koch, http://www.koch.ro


Re: feed queue fetcher with hadoop/zookeeper/gearman?

2010-04-12 Thread Patrick Hunt
See this environment http://bit.ly/4ekN8G. Subsequently I used the 3 
server setup, each configured with 8gig of heap in the jvm and 4 
CPUs/jvm (I think I used 10second session timeouts for this) for some 
additional testing that I've not written up yet. I was able to run ~500 
clients (same test script) in parallel. So that means about 5million 
znodes and 25million watches.


The thing to watch out for is:
1) most important is you need to tune the GC, in particular you need to 
turn on CMS and incremental GC. OTW the GC pauses will cause high 
latencies and you will see session timeouts

2) you need a stable network, esp for the serving ensemble
3) sufficient memory available in the JVM heap
4) no IO issues on the serving hosts (VM's, overloaded disk, swapping, 
etc...)


In your case you've got less going on with only 30 or so writes per 
second. The performance page shows that your going to be well below the 
max ops/sec we see in our testing harness.


btw, gearman would also be a good choice imo. I've looked at integrating 
ZK with gearman, there are two potentials. 1) as an additional backend 
persistent store for gearman, 2) as a way of addressing gearman 
failover. 1 is pretty simple to do today, 2 is harder, would require 
some changes to gearman itself but I think it would be useful (automatic 
failover of persistent tasks if a gearman server fails).


Patrick

On 04/12/2010 10:49 AM, Thomas Koch wrote:

Mahadev Konar:

Hi Thomas,
   There are a couple of projects inside Yahoo! that use ZooKeeper as an
event manager for feed processing.

I am little bit unclear on your example below. As I understand it-

1. There are 1 million feeds that will be stored in Hbase.
2. A map reduce job will be run on these feeds to find out which feeds need
to be fetched.
3. This will create queues in ZooKeeper to fetch the feeds
4.  Workers will pull items from this queue and process feeds

Did I understand it correctly? Also, if above is the case, how many queue
items would you anticipate be accumulated every hour?

Yes. That's exactly what I'm thinking about. Currently one node processes like
2 Feeds an hour and we have 5 feed-fetch-nodes. This would mean ~10
queue items/hour. Each queue item should carry some meta informations, most
important the feed items, that are already known to the system so that only
new items get processed.

Thomas Koch, http://www.koch.ro