Re: Pregel

2009-06-26 Thread Owen O'Malley


On Jun 25, 2009, at 9:42 PM, Mark Kerzner wrote:

my guess, as good as anybody's, is that Pregel is to large graphs is  
what

Hadoop is to large datasets.


I think it is much more likely a language that allows you to easily  
define fixed point algorithms.  I would imagine a distributed version  
of something similar to Michal Young's GenSet. http://portal.acm.org/citation.cfm?doid=586094.586108


I've been trying to figure out how to justify working on a project  
like that for a couple of years, but haven't yet. (I have a background  
in program static analysis, so I've implemented similar stuff.)



In other words, Pregel is the next natural step
for massively scalable computations after Hadoop.


I wonder if it uses map/reduce as a base or not. It would be easier to  
use map/reduce, but a direct implementation would be more performant.  
In either case, it is a new hammer. From what I see, it likely won't  
replace map/reduce, pig, or hive; but rather support a different class  
of applications much more directly than you can under map/reduce.


-- Owen



Re: A simple performance benchmark for Hadoop, Hive and Pig

2009-06-19 Thread Owen O'Malley
On Thu, Jun 18, 2009 at 9:29 PM, Zheng Shao zs...@facebook.com wrote:


 Yuntao Jia, our intern this summer, did a simple performance benchmark for
 Hadoop, Hive and Pig based on the queries in the SIGMOD 2009 paper: A
 Comparison of Approaches to Large-Scale Data Analysis


It should be noted that no one on the Pig team was involved in setting up
the benchmarks and the queries don't follow the Pig cookbook suggestions for
writing efficient queries, so these results should be considered *extremely*
preliminary. Furthermore, I can't see any way that Hive should be able to
beat raw map/reduce, since Hive uses map/reduce to run the job.

In the future, it would be better to involve the respective communities
(mapreduce-dev and pig-dev) far before pushing benchmark results out to the
user lists. The Hadoop project, which includes all three subprojects, needs
to be a cooperative community that is trying to build the best software we
can. Getting benchmark numbers is good, but are better done in a
collaborative manner.

-- Owen


Re: Practical limit on emitted map/reduce values

2009-06-18 Thread Owen O'Malley
Keys and values can be large. They are certainly capped above by
Java's 2GB limit on byte arrays. More practically, you will have
problems running out of memory with keys or values of 100 MB. There is
no restriction that a key/value pair fits in a single hdfs block, but
performance would suffer. (In particular, the FileInputFormats split
at block sized chunks, which means you will have maps that scan an
entire block without processing anything.)

-- Owen


Re: Practical limit on emitted map/reduce values

2009-06-18 Thread Owen O'Malley

On Jun 18, 2009, at 8:45 AM, Leon Mergen wrote:

Could you perhaps elaborate on that 100 MB limit ? Is that due to a  
limit that is caused by the Java VM heap size ? If so, could that,  
for example, be increased to 512MB by setting mapred.child.java.opts  
to '-Xmx512m' ?


A couple of points:
  1. The 100MB was just for ballpark calculations. Of course if you  
have a large heap, you can fit larger values. Don't forget that the  
framework is allocating big chunks of the heap for its own buffers,  
when figuring out how big to make your heaps.
  2. Having large keys is much harder than large values. When doing a  
N-way merge, the framework has N+1 keys and 1 value in memory at a time.


-- Owen


Re: multiple file input

2009-06-18 Thread Owen O'Malley

On Jun 18, 2009, at 10:56 AM, pmg wrote:

Each line from FileA gets compared with every line from FileB1,  
FileB2 etc.

etc. FileB1, FileB2 etc. are in a different input directory


In the general case, I'd define an InputFormat that takes two  
directories, computes the input splits for each directory and  
generates a new list of InputSplits that is the cross-product of the  
two lists. So instead of FileSplit, it would use a FileSplitPair that  
gives the FileSplit for dir1 and the FileSplit for dir2 and the record  
reader would return a TextPair with left and right records (ie.  
lines). Clearly, you read the first line of split1 and cross it by  
each line from split2, then move to the second line of split1 and  
process each line from split2, etc.


You'll need to ensure that you don't overwhelm the system with either  
too many input splits (ie. maps). Also don't forget that N^2/M grows  
much faster with the size of the input (N) than the M machines can  
handle in a fixed amount of time.



Two input directories

1. input1 directory with a single file of 600K records - FileA
2. input2 directory segmented into different files with 2Million  
records -

FileB1, FileB2 etc.


In this particular case, it would be right to load all of FileA into  
memory and process the chunks of FileB/part-*. Then it would be much  
faster than needing to re-read the file over and over again, but  
otherwise it would be the same.


-- Owen


Re: Anyway to sort keys before Reduce function in Hadoop ?

2009-06-17 Thread Owen O'Malley
On Wed, Jun 17, 2009 at 12:26 PM, Chuck Lam chuck@gmail.com wrote:

 an alternative is to create a new WritableComparator and then set it
 in the JobConf object with the method setOutputKeyComparatorClass().
 You can use IntWritable.Comparator as a start.


The important part of that is to define a RawComparator for your key class
and call JobConf.setOutputKeyComparatorClass with it.So if you wanted to
invert the default sort order of IntWritable keys, you could:

public class InvertedIntWritableComparator extends IntWritable.Comparator {

public int compare(byte[] b1, int s1, int l1,
   byte[] b2, int s2, int l2) {
  return -1 * super.compare(b1,s1,l1,b2,s2,l2);
}
}

then

job.setOutputKeyComparatorClass(InvertedIntWritableComparator.class);

-- Owen


Re: MapContext.getInputSplit() returns nothing

2009-06-16 Thread Owen O'Malley

Sorry, I forget how much isn't clear to people who are just starting.

FileInputFormat creates FileSplits. The serialization is very stable  
and can't be changed without breaking things. The reason that pipes  
can't stringify it is that the string form of input splits are  
ambiguous (and since it is user code, we really can't make assumptions  
about it). The format of FileSplit is:


16 bit filename byte length
filename in bytes
64 bit offset
64 bit length

Technically the filename uses a funky utf-8 encoding, but in practice  
as long as the filename has ascii characters they are ascii. Look at  
org.apache.hadoop.io.UTF.writeString for the precise definition.


-- Owen


Re: MapContext.getInputSplit() returns nothing

2009-06-15 Thread Owen O'Malley

*Sigh* We need Avro for input splits.

That is the expected behavior. It would be great if someone wrote a C+ 
+ FileInputSplit class that took a binary string and converted it back  
to a filename, offset, and length.


-- Owen


Re: speedy Google Maps driving directions like implementation

2009-06-15 Thread Owen O'Malley

On Jun 10, 2009, at 2:43 AM, Lukáš Vlček wrote:

I am wondering how Google implemented the driving directions  
function in the
Maps. More specifically how did they do it that it is so fast. I  
asked on
Google engineer about this and all he told me is just that there are  
bunch

of MapReduce cycles involved in this process


Like search, the key is to use batch map/reduce jobs to generate the  
indexes that are used to answer the query in real-time. The right  
question is what kind of indexes can be used to answer the routing  
question quickly. Generalizing back to a graph, you could use map/ 
reduce jobs to generate the all-pairs-shortest-distance matrix. Then  
it is easy to use the matrix to solve arbitrary  shortest path  
problems in linear time.


-- Owen

Re: Hadoop benchmarking

2009-06-10 Thread Owen O'Malley

Take a look at Arun's slide deck on Hadoop performance:

http://bit.ly/EDCg3

It is important to get io.sort.mb large enough, the io.sort.factor  
should be closer to 100 instead of 10. I'd also use large block sizes  
to reduce the number of maps. Please see the deck for other important  
factors.


-- Owen


Re: Image indexing/searching with Hadoop and MPI

2009-06-07 Thread Owen O'Malley
 Ok I can understand your point - but I am sure that some people have been
 trying to use map-reduce programming model to do CFD, or any other
 scientific computing.
 Any experience in this area from the list ?

I know of one project that assumes it has an entire Hadoop cluster,
and generates the hostnames in the Mapper and uses those host lists in
the Reducer to launch an MPI job. They do it because it provides a
higher efficiency for doing very small data transfers. The alternative
was doing a long chain of map/reduce jobs that have very small outputs
from each phase. I wouldn't recommend using MPI under map/reduce in
general, since it involves making a lot of assumptions about your
application. In particular, to avoid from killing your cluster your
shouldn't use checkpoints in your application and just rerun the
application from the beginning on failures. That implies that the
application can't run very long (upper bound of probably 30 minutes on
2000 nodes).

That said, if you want to run other styles of applications, you really
want a two level scheduler. Where the first level scheduler allocates
nodes (or partial nodes) to jobs (or frameworks). Effectively, that is
what Hadoop On Demand (HOD) was doing with Torque, but I suspect there
will be a more performant solution than HOD with in the next year.

-- Owen


Re: Fastlz coming?

2009-06-04 Thread Owen O'Malley


On Jun 4, 2009, at 11:19 AM, Kris Jirapinyo wrote:


Hopefully we can have a new page on the hadoop wiki on how to
use custom compression so that people won't have to go search  
through the

threads to find the answer in the future.


Yes, it would be extremely useful if you could start a wiki page with  
your discoveries. Since you had to discover it, it will be clear to  
you what needs to be documented.


On a similar note, I always find that I write the best comments in  
code after I leave a section for 6 months and have to figure out what  
I meant when I wrote it. Otherwise, it is too obvious. *smile*


-- Owen


Re: InputFormat for fixed-width records?

2009-05-28 Thread Owen O'Malley

On May 28, 2009, at 5:15 AM, Stuart White wrote:


I need to process a dataset that contains text records of fixed length
in bytes.  For example, each record may be 100 bytes in length


The update to the terasort example has an InputFormat that does  
exactly that. The key is 10 bytes and the value is the next 90 bytes.  
It is pretty easy to write, but I should upload it soon. The output  
types are Text, but they just have the binary data in them.


-- Owen


Re: Linking against Hive in Hadoop development tree

2009-05-20 Thread Owen O'Malley


On May 20, 2009, at 3:07 AM, Tom White wrote:


Why does mapred depend on hdfs? MapReduce should only depend on the
FileSystem interface, shouldn't it?


Yes, I should have been consistent. In terms of compile-time  
dependences, mapred only depends on core.


-- Owen


Re: Linking against Hive in Hadoop development tree

2009-05-20 Thread Owen O'Malley


On May 15, 2009, at 3:25 PM, Aaron Kimball wrote:

Yikes. So part of sqoop would wind up in one source repository, and  
part in

another? This makes my head hurt a bit.


I'd say rather that Sqoop is in Mapred and the adapter to Hive is in  
Hive.



I'm also not convinced how that helps.


Clearly, what you need to arrange is to not have a compile time  
dependence on Hive. Clearly we don't want cycles in the dependence  
tree, so you need to figure out how to make the adapter for Hive a  
plugin rather than a part of the Sqoop core.


-- Owen


Re: Beware sun's jvm version 1.6.0_05-b13 on linux

2009-05-18 Thread Owen O'Malley


On May 18, 2009, at 3:42 AM, Steve Loughran wrote:

Presumably its one of those hard-to-reproduce race conditions that  
only surfaces under load on a big cluster so is hard to replicate in  
a unit test, right?


Yes. It reliably happens on a 100TB or larger sort, but almost never  
happens on a small scale.


-- Owen


Beware sun's jvm version 1.6.0_05-b13 on linux

2009-05-15 Thread Owen O'Malley
We have observed that the default jvm on RedHat 5 can cause  
significant data corruption in the map/reduce shuffle for those using  
Hadoop 0.20. In particular, the guilty jvm is:


java version 1.6.0_05
Java(TM) SE Runtime Environment (build 1.6.0_05-b13)
Java HotSpot(TM) Server VM (build 10.0-b19, mixed mode)

By upgrading to jvm build 1.6.0_13-b03, we fixed the problem. The  
observed behavior is that Jetty serves up random bytes from other  
transfers. In particular, some of them were valid transfers to the  
wrong reduce. We suspect the relevant java bug is:


http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933

We have also filed a bug on Hadoop to add sanity checks on the shuffle  
that will work around the problem:


https://issues.apache.org/jira/browse/HADOOP-5783

-- Owen


Re: Linking against Hive in Hadoop development tree

2009-05-15 Thread Owen O'Malley

On May 15, 2009, at 2:05 PM, Aaron Kimball wrote:


In either case, there's a dependency there.


You need to split it so that there are no cycles in the dependency  
tree. In the short term it looks like:


avro:
core: avro
hdfs: core
mapred: hdfs, core
hive: mapred, core
pig: mapred, core

Adding a dependence from core to hive would be bad. To integrate with  
Hive, you need to add a contrib module to Hive that adds it.


-- Owen


Re: Access counters from within Reducer#configure() ?

2009-05-11 Thread Owen O'Malley


On May 11, 2009, at 8:48 AM, Stuart White wrote:


I'd like to be able to access my job's counters from within my
Reducer's configure() method (so I can know how many records were
output from my mappers).  Is this possible?  Thanks!


The counters aren't available and if they were the information you  
want isn't there. Counters only flow up to the JobTracker. They never  
flow down to the task attempts, so each task only sees its own counters.


-- Owen


Re: Is it possible to sort intermediate values and final values?

2009-05-07 Thread Owen O'Malley


On May 7, 2009, at 12:38 PM, Foss User wrote:


Where can I find this example. I was not able to find it in the
src/examples directory.


It is in 0.20.

http://svn.apache.org/repos/asf/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java

-- Owen


Re: What User Accounts Do People Use For Team Dev?

2009-05-05 Thread Owen O'Malley
Best is to use one user for map/reduce and another for hdfs. Neither
of them should be root or real users. With the setuid patch
(HADOOP-4490), it is possible to run the jobs as the submitted user.
Note that if you do that, you no doubt want to block certain system
uids (bin, mysql, etc.)

-- Owen


Re: Implementing compareTo in user-written keys where one extends the other is error prone

2009-04-30 Thread Owen O'Malley
If you use custom key types, you really should be defining a  
RawComparator. It will perform much much better.


-- Owen


Re: core-user Digest 23 Apr 2009 02:09:48 -0000 Issue 887

2009-04-23 Thread Owen O'Malley


On Apr 22, 2009, at 10:44 PM, Koji Noguchi wrote:


Nigel,

When you have time, could you release 0.18.4 that contains some of the
patches that make our clusters 'stable'?


Is it just the patches that have already been applied to the 18  
branch? Or are there more?


-- Owen


Re: Is combiner and map in same JVM?

2009-04-14 Thread Owen O'Malley


On Apr 14, 2009, at 11:10 AM, Saptarshi Guha wrote:

Thanks. I am using 0.19, and to confirm, the map and combiner (in  
the map jvm) are run in *different* threads at the same time?


And the change was actually made in 0.18. So since then, the combiner  
is called 0, 1, or many times on each key in both the mapper and the  
reducer. It is called in a separate thread from the base application  
in the map (in the reduce task, the combiner is only use during the  
shuffle).


My native library is not thread safe, so I would have to implement  
locks. Aaron's email gave me hope(since the map and combiner would  
then be running sequentially), but this appears to make things  
complicated.


Yes, you'll probably need locks around your code that isn't thread safe.

-- Owen




Re: Multithreaded Reducer

2009-04-13 Thread Owen O'Malley


On Apr 10, 2009, at 11:12 AM, Sagar Naik wrote:


Hi,
I would like to implement a Multi-threaded reducer.
As per my understanding , the system does not have one coz we expect  
the output to be sorted.


However, in my case I dont need the output sorted.


You'd probably want to make a blocking concurrent queue of the key  
value pairs that are given to the reducer. Then have a pool of  
reducers that pull from the queue. It can be modeled on the multi- 
threaded map runner. Do be aware, that you'll need to clone the keys  
and values that are given to the reduce.


-- Owen


Re: HDFS read/write speeds, and read optimization

2009-04-10 Thread Owen O'Malley
On Thu, Apr 9, 2009 at 9:30 PM, Brian Bockelman bbock...@cse.unl.edu wrote:

 On Apr 9, 2009, at 5:45 PM, Stas Oskin wrote:

 Hi.

 I have 2 questions about HDFS performance:

 1) How fast are the read and write operations over network, in Mbps per
 second?


 Depends.  What hardware?  How much hardware?  Is the cluster under load?
  What does your I/O load look like?  As a rule of thumb, you'll probably
 expect very close to hardware speed.

For comparison, on a 1400 node cluster, I can checksum 100 TB in
around 10 minutes, which means I'm seeing read averages of roughly 166
GB/sec. For writes with replication of 3, I see roughly 40-50 minutes
to write 100TB, so roughly 33 GB/sec average. Of course the peaks are
much higher. Each node has 4 SATA disks, dual quad core, and 8 GB of
ram.

-- Owen


Re: HDFS read/write speeds, and read optimization

2009-04-10 Thread Owen O'Malley


On Apr 10, 2009, at 9:07 AM, Stas Oskin wrote:

From your experience,  how RAM hungry HDFS is? Meaning, additional  
4GB or

ram (to make it 8GB aas in your case), really change anything?


I don't think the 4 to 8GB would matter much for HDFS. For Map/Reduce,  
it is very important.


-- Owen


Re: How many people is using Hadoop Streaming ?

2009-04-07 Thread Owen O'Malley


On Apr 7, 2009, at 11:41 AM, Aaron Kimball wrote:


Owen,

Is binary streaming actually readily available?


https://issues.apache.org/jira/browse/HADOOP-1722



Re: problem running on a cluster of mixed hardware due to Incompatible buildVersion of JobTracker adn TaskTracker

2009-04-06 Thread Owen O'Malley

This was discussed over on:

https://issues.apache.org/jira/browse/HADOOP-5203

Doug uploaded a patch, but no one seems to be working on it.

-- Owen


Re: connecting two clusters

2009-04-06 Thread Owen O'Malley


On Apr 6, 2009, at 9:49 PM, Mithila Nagendra wrote:


Hey all
I'm trying to connect two separate Hadoop clusters. Is it possible  
to do so?
I need data to be shuttled back and forth between the two clusters.  
Any

suggestions?


You should use hadoop distcp. It is a map/reduce program that copies  
data, typically from one cluster to another. If you have the hftp  
interface enabled, you can use that to copy between hdfs clusters that  
are different versions.


hadoop distcp hftp://namenode1:1234/foo/bar hdfs://foo/bar

-- Owen


Re: best practice: mapred.local vs dfs drives

2009-04-05 Thread Owen O'Malley

We always share the drives.

-- Owen

On Apr 5, 2009, at 0:52, zsongbo zson...@gmail.com wrote:

I usually set mapred.local.dir to share the disk space with DFS,  
since some

mapreduce job need big temp space.



On Fri, Apr 3, 2009 at 8:36 PM, Craig Macdonald  
cra...@dcs.gla.ac.ukwrote:



Hello all,

Following recent hardware discussions, I thought I'd ask a related
question. Our cluster nodes have 3 drives: 1x 160GB system/scratch  
and 2x

500GB DFS drives.

The 160GB system drive is partitioned such that 100GB is for job
mapred.local space. However, we find that for our application,  
mapred.local
free space for map output space is the limiting parameter on the  
number of

reducers we can have (our application prefers less reducers).

How do people normally work for dfs vs mapred.local space. Do you  
(a) share
the DFS drives with the task tracker temporary files, Or do you (b)  
keep

them on separate partitions or drives?

We originally went with (b) because it prevented a run-away job  
from eating
all the DFS space on the machine, however, I'm beginning to realise  
the

disadvantages.

Any comments?

Thanks

Craig




Re: Users are not properly authenticated in Hadoop

2009-04-05 Thread Owen O'Malley


On Apr 5, 2009, at 3:30 AM, Foss User wrote:


However, I see that anyone can easily impersonate as fossist


Yes, it is easy to work around the security in Hadoop. It is only  
intended to prevent accidents, such as the time a student accidently  
wiped out the entire class' home directories.



If not, I have to go for other measures like firewalls, etc. But if
something is available in Hadoop itself, it would be great.


You currently need firewalls to protect Hadoop. There is work underway  
to add real authentication to Hadoop, which enable real security.


-- Owen


Re: How many people is using Hadoop Streaming ?

2009-04-03 Thread Owen O'Malley


On Apr 3, 2009, at 9:42 AM, Ricky Ho wrote:


Has anyone benchmark the performance difference of using Hadoop ?
 1) Java vs C++
 2) Java vs Streaming


Yes, a while ago. When I tested it using sort, Java and C++ were  
roughly equal and streaming was 10-20% slower. Most of the cost with  
streaming came from the stringification.


 1) I can pick the language that offers a different programming  
paradigm (e.g. I may choose functional language, or logic  
programming if they suit the problem better).  In fact, I can even  
chosen Erlang at the map() and Prolog at the reduce().  Mix and  
match can optimize me more.
 2) I can pick the language that I am familiar with, or one that I  
like.
 3) Easy to switch to another language in a fine-grain incremental  
way if I choose to do so in future.


Additionally, the interface to streaming is very stable. *smile* It  
also supports legacy applications well.


The downsides are that:
  1. The interface is very thin and has minimal functionality.
  2. Streaming combiners don't work very well. Many streaming  
applications buffer in the map

  and run the combiner internally.
  3. Streaming doesn't group the values in the reducer. In Java or C+ 
+, you get:

 key1, (value1, value2, ...)
 key2, (value3, ...)
  in streaming you get
 key1 value1
 key1 value2
 key2 value3
  and your application needs to detect the key changes.
  4. Binary data support has only recently been added to streaming.

Am I missing something here ?  or is the majority of Hadoop  
applications written in Hadoop Streaming ?


On Yahoo's research clusters, typically 1/3 of the applications are  
streaming, 1/3 pig, and 1/3 java.


-- Owen


Re: How many people is using Hadoop Streaming ?

2009-04-03 Thread Owen O'Malley


On Apr 3, 2009, at 10:35 AM, Ricky Ho wrote:

I assume that the key is still sorted, right ?  That mean I will get  
all the key1, valueX entries before getting any of the key2  
valueY entries and key2 is always bigger than key1.


Yes.

-- Owen


Re: Hadoop/HDFS for scientific simulation output data analysis

2009-04-03 Thread Owen O'Malley

On Apr 3, 2009, at 1:41 PM, Tu, Tiankai wrote:


By the way, what is the largest size---in terms of total bytes, number
of files, and number of nodes---in your applications? Thanks.


The largest Hadoop application that has been documented is the Yahoo  
Webmap.


10,000 cores
500 TB shuffle
300 TB compressed final output

http://developer.yahoo.net/blogs/hadoop/2008/02/yahoo-worlds-largest-production-hadoop.html

-- Owen


Re: datanode but not tasktracker

2009-04-01 Thread Owen O'Malley


On Apr 1, 2009, at 12:58 AM, Sandhya E wrote:


Hi

When the host is listed in slaves file both DataNode and TaskTracker
are started on that host. Is there a way in which we can configure a
node to be datanode and not tasktracker.


If you use hadoop-daemons.sh, you can pass a host list. So do:

ssh namenode hadoop-daemon.sh start namenode
ssh jobtracker hadoop-daemon.sh start jobtracker
hadoop-daemons.sh -hosts dfs.slaves start datanode
hadoop-daemons.hs -hosts mapred.slaves start tasktracker

-- Owen


Re: Reduce doesn't start until map finishes

2009-03-24 Thread Owen O'Malley
What happened is that we added fast start (HADOOP-3136), which
launches more than one task per a heartbeat. Previously, if you maps
didn't take very long, they finished before the heartbeat and the task
tracker was assigned a new map task. A side effect was that no reduce
tasks were launched until the maps were complete, which prevents the
shuffle from overlapping with the maps.

-- Owen


ApacheCon EU 2009 this week

2009-03-23 Thread Owen O'Malley
 ApacheCon EU 2009 is in Amsterdam this week, with a lot of talks on  
Hadoop. There are also going to be a lot of the committers there,  
including Doug Cutting. There is no word yet whether he is bringing  
the original Hadoop as seen in the NY Times.


This year the live video streaming includes the Hadoop track. The  
video of the keynote and lunch talks are free, but there is a charge  
for the Hadoop track.


The Hadoop track this year includes:

* Opening Keynote - Data Management in the Cloud - Raghu  
Ramakrishnan

* Introduction to Hadoop - Owen O'Malley
* Hadoop Map-Reduce: Tuning and Debugging - Arun Murthy
* Pig: Making Hadoop Easy - Olga Natkovich
* Running Hadoop in the Cloud - Tom White
* Configuring Hadoop for Grid Services - Allen Wittenauer
* Dynamic Hadoop Clusters - Steve Loughran

-- Owen



Re: Sorting data numerically

2009-03-23 Thread Owen O'Malley


On Mar 23, 2009, at 9:15 AM, tim robertson wrote:


If Akira was to write his/her own Mappers, using types like
IntWritable would result in it being numerically sorted right?


Yes.

Or they can use the KeyFieldBasedComparator. I think if you put the  
following in your job conf, you'll get the right behavior.


mapred.output.key.comparator.class =  
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator

mapred.text.key.comparator.options = -n

-- Owen


Re: Coordination between Mapper tasks

2009-03-19 Thread Owen O'Malley


On Mar 18, 2009, at 10:26 AM, Stuart White wrote:


I'd like to implement some coordination between Mapper tasks running
on the same node.  I was thinking of using ZooKeeper to provide this
coordination.


This is a very bad idea in the general case. It can be made to work,  
but you need to have a dedicated cluster so that you are sure they are  
all active at the same time. Otherwise, you have no guarantee that all  
of the maps are running at the same time.


In most cases, you are much better off using the standard  
communication between the maps and reduces and making multiple passes  
of jobs.



I think I remember hearing that MapReduce and/or HDFS use ZooKeeper
under-the-covers.


There are no immediate plans to implement HA yet.

-- Owen


Re: Numbers of mappers and reducers

2009-03-17 Thread Owen O'Malley


On Mar 17, 2009, at 9:18 AM, Richa Khandelwal wrote:


I was going through FAQs on Hadoop to optimize the performance of
map/reduce. There is a suggestion to set the number of reducers to a  
prime
number closest to the number of nodes and number of mappers a prime  
number

closest to several times the number of nodes in the cluster.


There is no need for the number of reduces to be prime. The only thing  
it helps is if you are using the HashPartitioner and your key's hash  
function is too linear. In practice, you usually want to use 99% of  
your reduce capacity of the cluster.


-- Owen


Re: Release batched-up output records at end-of-job?

2009-03-17 Thread Owen O'Malley


On Mar 17, 2009, at 5:53 AM, Stuart White wrote:


Yeah, I thought of that, but I was concerned that, even if it did
work, if it wasn't guaranteed behavior, that it might stop working in
a future release.  I'll go ahead and give that a try.


No, it will continue to work. It is required for both streaming and  
pipes to work.



Can anybody provide details on this new API?


Sure, it is in the upcoming 0.20. I made the package  
org.apache.hadoop.mapreduce so that I could have the new API running  
in parallel with the old one. You can see the new API in subversion:


http://svn.apache.org/repos/asf/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/

-- Owen


Re: Massive discrepancies in job's bytes written/read

2009-03-17 Thread Owen O'Malley


On Mar 17, 2009, at 7:44 PM, Bryan Duxbury wrote:


There is no compression in the mix for us, so that's not the culprit.

I'd be sort of willing to believe that spilling and sorting play a  
role in this, but, wow, over 10x read and write? That seems like a  
big problem.


It happened recently to me too. It was off by 6x. The strange thing  
was that the individual tasks looked right. It was just the aggregate  
that was wrong.


-- Owen


Re: Temporary files for mapppers and reducers

2009-03-15 Thread Owen O'Malley
Just use the current working directory. Each task gets a unique  
directory that is erased when the task finished.


-- Owen

On Mar 15, 2009, at 16:08, Mark Kerzner markkerz...@gmail.com wrote:


Hi,

what would be the best place to put temporary files for a reducer? I  
believe
that since reducers each work on its own machine, at its own time,  
one can

do anything, but I would like a confirmation from the experts.

Thanks,
Mark


Re: null value output from map...

2009-03-13 Thread Owen O'Malley


On Mar 13, 2009, at 3:56 PM, Richa Khandelwal wrote:


You can initialize IntWritable with an empty constructor.
IntWritable i=new IntWritable();


NullWritable is better for that application than IntWritable. It  
doesn't consume any space when serialized. *smile*


-- Owen


Re: Does hadoop-default.xml + hadoop-site.xml matter for whole cluster or each node?

2009-03-09 Thread Owen O'Malley

On Mar 7, 2009, at 10:56 PM, pavelkolo...@gmail.com wrote:



Does hadoop-default.xml + hadoop-site.xml of master host matter  
for whole Job

or they matter for each node independently?


Please never modify hadoop-default. That is for the system defaults.  
Please use hadoop-site for your configuration.


It depends on the property whether they come from the job's  
configuration or the system's. Some  like io.sort.mb and  
mapred.map.tasks come from the job, while others like  
mapred.tasktracker.map.tasks.maximum come from the system. The job  
parameters come from the submitting client, while the system  
parameters need to be distributed to each worker node.


-- Owen


For example, if one of them (or both) contains:
property
 namemapred.map.tasks/name
 value6/value
/property

then is it means that six mappers will be executed on all nodes or 6  
on each node?


That means that your job will default to 6 maps.  
mapred.tasktracker.map.tasks.maximum specifies the number of maps  
running on each node.


And yes, we really should do a cleanup of the property names to do  
something like:


mapred.job.*
mapred.system.*

to separate the job from the system parameters.

-- Owen


Re: question about released version id

2009-03-09 Thread Owen O'Malley

On Mar 2, 2009, at 11:46 PM, 鞠適存 wrote:

I wonder how to make the hadoop version number.


Each 0.18, 0.19 and 0.20 have their own branch. The first release on  
each branch is 0.X.0, and then 0.X.1 and so on. New features are only  
put into trunk and only important bug fixes are put into the branches.  
So there will be no new functionality going from 0.X.1 to 0.X.2, but  
there will be going from a release of 0.X to 0.X+1.


-- Owen

Re: Reducer goes past 100% complete?

2009-03-09 Thread Owen O'Malley


On Mar 9, 2009, at 1:00 PM, james warren wrote:

Speculative execution has existed far before 0.19.x, but AFAIK the   
100%
issue has appeared (at least with greater frequency) since 0.19.0  
came out.
Are you saying there are changes in how task progress is being  
tracked?


In the past, it has usually been when the input is compressed and the  
code is doing something like uncompressed / total compressed to figure  
out the done percent.


-- Owen


Re: MapReduce jobs with expensive initialization

2009-03-02 Thread Owen O'Malley


On Mar 2, 2009, at 3:03 AM, Tom White wrote:


I believe the static singleton approach outlined by Scott will work
since the map classes are in a single classloader (but I haven't
actually tried this).


Even easier, you should just be able to do it with static  
initialization in the Mapper class. (I haven't tried it either... )


-- Owen


Re: Shuffle phase

2009-02-26 Thread Owen O'Malley


On Feb 26, 2009, at 2:03 PM, Nathan Marz wrote:

Do the reducers batch copy map outputs from a machine? That is, if a  
machine M has 15 intermediate map outputs destined for machine R,  
will machine R copy the intermediate outputs one at a time or all at  
once?


Currently, one at a time. In 0.21 it will be batched up.

-- Owen


Re: Batching key/value pairs to map

2009-02-23 Thread Owen O'Malley
On Mon, Feb 23, 2009 at 12:06 PM, Jimmy Wan ji...@indeed.com wrote:

 part of my map/reduce process could be greatly sped up by mapping
 key/value pairs in batches instead of mapping them one by one.
 Can I safely hang onto my OutputCollector and Reporter from calls to map?


Yes. You can even use them in the close, so that you can process the last
batch of records. *smile* One problem that you will quickly hit is that
Hadoop reuses the objects that are passed to map and reduce. So, you'll need
to clone them before putting them into the collection.

I'm currently running Hadoop 0.17.2.1. Is this something I could do in
 Hadoop 0.19.X?


I don't think any of this changed between 0.17 and 0.19, other than in 0.17
the reduce's inputs were always new objects. In 0.18 and after, the reduce's
inputs are reused.

-- Owen


Re: Batching key/value pairs to map

2009-02-23 Thread Owen O'Malley


On Feb 23, 2009, at 2:19 PM, Jimmy Wan wrote:


 I'm not sure if this is
possible, but it would certainly be nice to either:
1) pass the OutputCollector and Reporter to the close() method.
2) Provide accessors to the OutputCollector and the Reporter.


If you look at the 0.20 branch, which hasn't released yet, there is a  
new map/reduce api. That api does provide a lot more control. Take a  
look at Mapper, which provide setup, map, and cleanup hooks:


http://tinyurl.com/bquvxq

The map method looks like:

  /**
   * Called once for each key/value pair in the input split. Most  
applications

   * should override this, but the default is the identity function.
   */
  @SuppressWarnings(unchecked)
  protected void map(KEYIN key, VALUEIN value,
  Context context) throws  
IOException, InterruptedException {

context.write((KEYOUT) key, (VALUEOUT) value);
  }

But there is also a run method that drives the task. The default is  
given below, but it can be overridden by the application.


  /**
   * Expert users can override this method for more complete control  
over the

   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException,  
InterruptedException {

setup(context);
while (context.nextKeyValue()) {
  map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
  }

Clearly, in your application you could override run to make a list of  
100 key, value pairs or something.


-- Owen


Re: Is there a way to tell whether you're in a map task or a reduce task?

2009-02-10 Thread Owen O'Malley


On Feb 10, 2009, at 5:20 PM, Matei Zaharia wrote:


I'd like to write a combiner that shares a lot of code with a reducer,
except that the reducer updates an external database at the end.


The right way to do this is to either do the update in the output  
format or do something like:


class MyCombiner implements Reducer {
 ...
 public void close() throws IOException {}
}

class MyReduer extends MyCombiner {
  ...
  public void close() throws IOException { ... update database ... }
}


As far as I
can tell, since both combiners and reducers must implement the Reducer
interface, there is no way to have this be the same class.


There are ways to do it, but they are likely to change.


Is there a
recommended way to test inside the task whether you're running as a  
combiner

(in a map task) or as a reducer?


The question is worse than you think. In particular, the question is  
*not* are you in a map or reduce task. With current versions of  
Hadoop, the combiner can be called in the context of the reduce as  
well as the map. You really want to know if you are in a Reducer or  
Combiner context.


If not, I think this might be an interesting thing to support in the  
Hadoop

1.0 API.


It probably does make sense to add to ReduceContext.isCombiner() to  
answer the question. In practice, usually if someone wants to use  
*almost* the same code for combiner and reducer, I get suspicious of  
their design.



It would enable people to write an AbstractJob class where you just
implement map, combine and reduce functions, and can thus write  
MapReduce

jobs in a single Java class.


The old api allowed this, since both Mapper and Reducer were  
interfaces. The new api doesn't because they are both classes. It  
wouldn't be hard to make a set of adaptors in library code that would  
work. Basically, you would define a job with SimpleMapper,  
SimpleCombiner, and SimpleReducer that would call Task.map,  
Task.combine, and Task.reduce.


-- Owen


Re: only one reducer running in a hadoop cluster

2009-02-09 Thread Owen O'Malley


On Feb 7, 2009, at 11:52 PM, Nick Cen wrote:


Hi,

I hava a hadoop cluster with 4 pc. And I wanna to integrate hadoop and
lucene together, so i copy some of the source code from nutch's  
Indexer
class, but when i run my job, i found that there is only 1 reducer  
running

on 1 pc, so the performance is not as far as expect.


Set mapred.reduce.tasks in your configuration to the number of  
reduces, you want your jobs to have by default. Typically this should  
be 0.99 * mapred.tasktracker.reduce.tasks.maximum * number of computers.


Re: can't read the SequenceFile correctly

2009-02-09 Thread Owen O'Malley


On Feb 6, 2009, at 8:52 AM, Bhupesh Bansal wrote:


Hey Tom,

I got also burned by this ?? Why does BytesWritable.getBytes() returns
non-vaild bytes ?? Or we should add a BytesWritable.getValidBytes()  
kind of function.


It does it because continually resizing the array to the valid  
length is very expensive. It would be a reasonable patch to add a  
getValidBytes, but most methods in Java's libraries are aware of this  
and let you pass in byte[], offset, and length. So once you realize  
what the problem is, you can work around it.


-- Owen


Re: TaskTrackers being double counted after restart job recovery

2009-02-09 Thread Owen O'Malley
There is a bug that when we restart the TaskTrackers they get counted twice.
The problem is the name is generated from the hostname and port number. When
TaskTrackers restart they get a new port number and get counted again. The
problem goes away when the old TaskTrackers time out in 10 minutes or you
restart the JobTracker.

-- Owen


Re: Set the Order of the Keys in Reduce

2009-01-22 Thread Owen O'Malley


On Jan 22, 2009, at 7:25 AM, Brian MacKay wrote:

Is there a way to set the order of the keys in reduce as shown  
below, no

matter what order the collection in MAP occurs in.


The keys to reduce are *always* sorted. If the default order is not  
correct, you can change the compare function.


As Tom points out, the critical thing is making sure that all of the  
keys that you need to group together go to the same reduce. So let's  
make it a little more concrete and say that you have:


public class TextPair implements Writable {
  public TextPair() {}
  public void set(String left, String right);
  public String getLeft();
  ...
}

And your map 0 does:
  key.set(CAT, B);
  output.collect(key, value);
  key.set(DOG, A);
  output.collect(key, value);

While map 1 does:
  key.set(CAT, A);
  output.collect(key, value);
  key.set(DOG,B);
  output.collect(key,value);

And you want to make sure that all of the cats go to the same reduces  
and that the dogs go to the same reduce, you would need to set the  
partitioner. It would look like:


public class MyPartitionerV implements PartitionerTextPair, V {

  public void configure(JobConf job) {}

  public int getPartition(TextPair key, V value,
 int numReduceTasks) {
return (key.getLeft().hashCode()  Integer.MAX_VALUE) %  
numReduceTasks;

  }
}

Then define a raw comparator that sorts based on both the left and  
right part of the TextPair, and you are set.


-- Owen


Re: Maven repo for Hadoop

2009-01-19 Thread Owen O'Malley


On Jan 17, 2009, at 5:53 PM, Chanwit Kaewkasi wrote:


I would like to integrate Hadoop to my project using Ivy.
Is there any maven repository containing Hadoop jars that I can point
my configuration to?


Not yet, but soon. We recently introduced ivy into Hadoop, so I  
believe we'll upload the pom and jar for 0.20.0 when it is released.


-- Owen


Re: Merging reducer outputs into a single part-00000 file

2009-01-14 Thread Owen O'Malley

On Jan 14, 2009, at 12:46 AM, Rasit OZDAS wrote:


Jim,

As far as I know, there is no operation done after Reducer.


Correct, other than output promotion, which moves the output file to  
the final filename.



But if you  are a little experienced, you already know these.
Ordered list means one final file, or am I missing something?


There is no value and a lot of cost associated with creating a single  
file for the output. The question is how you want the keys divided  
between the reduces (and therefore output files). The default  
partitioner hashes the key and mods by the number of reduces, which  
stripes the keys across the output files. You can use the  
mapred.lib.InputSampler to generate good partition keys and  
mapred.lib.TotalOrderPartitioner to get completely sorted output based  
on the partition keys. With the total order partitioner, each reduce  
gets an increasing range of keys and thus has all of the nice  
properties of a single file without the costs.


-- Owen


Re: General questions about Map-Reduce

2009-01-11 Thread Owen O'Malley

On Jan 11, 2009, at 5:50 AM, tienduc_dinh wrote:


- Does Map-Reduce support parallel writing/reading ?


Yes. The maps all read in parallel with each other and the reduces all  
write in parallel with each other.



- What happens after the Map-Reduce operation ?


The OutputFormat usually writes the output to HDFS.

-- Owen


Re: General questions about Map-Reduce

2009-01-11 Thread Owen O'Malley


On Jan 11, 2009, at 7:05 PM, tienduc_dinh wrote:


Is there any article which describes it ?


Please read the map/reduce tutorial:

http://hadoop.apache.org/core/docs/current/mapred_tutorial.html

-- Owen


Re: OutputCollector to print only keys

2009-01-08 Thread Owen O'Malley

On Jan 8, 2009, at 4:13 AM, Sandhya E wrote:


Does Hadoop 0.18 have an outputcollector that can print out only the
keys and supress the values. In our case we need only the keys, and
hence we do output.collect(key, blank) and then reprocess the entire
output file to remove trailing tabs.


If you are using TextOutputFormat, you can either emit NullWritables  
or null. So do:


output.collect(key, null);

and it should work fine.

-- Owen


Re: OutputCollector to print only keys

2009-01-08 Thread Owen O'Malley

On Jan 8, 2009, at 4:13 AM, Sandhya E wrote:


Does Hadoop 0.18 have an outputcollector that can print out only the
keys and supress the values. In our case we need only the keys, and
hence we do output.collect(key, blank) and then reprocess the entire
output file to remove trailing tabs.


If you are using TextOutputFormat, you can either emit NullWritables  
or null. So do:


output.collect(key, null);

and it should work fine.

-- Owen


Re: correct pattern for using setOutputValueGroupingComparator?

2009-01-05 Thread Owen O'Malley
This is exactly what the setOutputValueGroupingComparator is for. Take  
a look at HADOOP-4545, for an example using the secondary sort. If you  
are using trunk or 0.20, look at src/examples/org/apache/hadoop/ 
examples/SecondarySort.java. The checked in example uses the new map/ 
reduce api that was introduced in 0.20.


-- Owen


Re: Map input records(on JobTracker website) increasing and decreasing

2009-01-05 Thread Owen O'Malley

Yeah, I've seen this too. I just filed HADOOP-4983.

-- Owen


Re: Output.collect uses toString for custom key class. Is it possible to change this?

2008-12-17 Thread Owen O'Malley


On Dec 16, 2008, at 9:30 AM, David Coe wrote:


Since the SequenceFileOutputFormat doesn't like nulls, how would I use
NullWritable?  Obviously output.collect(key, null) isn't working.   
If I

change it to output.collect(key, new IntWritable()) I get the result I
want (plus an int that I don't), but output.collect(key, new
NullWritable()) does not work.


Sorry, I answered you literally. You can write a SequenceFile with  
NullWritables as the values, but you really want optional nulls. I'd  
probably define a Wrapper class like GenericWritable. It would look  
something like:


class NullableWriableT extends Writable implements Writable {
  private T instance;
  private boolean isNull;
  public void setNull(boolean isNull) {
this.isNull = isNull;
  }
  public void readFields(DataInput in) throws IOException {
read isNull;
if (!isNull) {
   instance.readFields(in);
  }
  public void write(DataOutput out) throws IOException {
write isNull;
if (!isNull) {
   instance.write(out);
}
  }
}

-- Owen



Re: Output.collect uses toString for custom key class. Is it possible to change this?

2008-12-16 Thread Owen O'Malley


On Dec 16, 2008, at 8:28 AM, David Coe wrote:


Is there a way to output the write() instead?



Use SequenceFileOutputFormat. It writes binary files using the write.  
The reverse is SequenceFileInputFormat, which reads the sequence files  
using readFields.


-- Owen


Re: [video] visualization of the hadoop code history

2008-12-16 Thread Owen O'Malley


On Dec 16, 2008, at 12:36 AM, Stefan Groschupf wrote:

It is a neat way of visualizing who is behind the Hadoop source code  
and how the project code base grew over the years.


It is interesting, but it would be more interesting to track the  
authors of the patch rather than the committer. The two are rarely the  
same. I've got some scripts for parsing the CHANGES.txt that pull all  
of that apart for each jira. I really should figure out a good place  
to check those in. *smile*


-- Owen


Re: Output.collect uses toString for custom key class. Is it possible to change this?

2008-12-16 Thread Owen O'Malley


On Dec 16, 2008, at 9:14 AM, David Coe wrote:


Does the SequenceFileOutputFormat work with NullWritable as the value?


Yes.


Re: concurrent modification of input files

2008-12-15 Thread Owen O'Malley


On Dec 15, 2008, at 8:08 AM, Sandhya E wrote:


I have a scenario where, while a map/reduce is working on a file, the
input file may get deleted and copied with a new version of the file.
All my files are compressed and hence each file is worked on by a
single node. I tried simulating the scenario of deleting the file
before mapreduce was over, and map/reduce went ahead without
complaining. Can I assume this will be the case always.


No, the results will be completely non-deterministic. Don't do this.  
That said, the thing that will save you in micro-tests of this is that  
if the file is missing at some point, the task will fail and retry.


-- Owen


Re: Simple data transformations in Hadoop?

2008-12-14 Thread Owen O'Malley


On Dec 13, 2008, at 6:32 PM, Stuart White wrote:

First question: would Hadoop be an appropriate tool for something  
like this?


Very


What is the best way to model this type of work in Hadoop?


As a map-only job with number of reduces = 0.


I'm thinking my mappers will accept a Long key that represents the
byte offset into the input file, and a Text value that represents the
line in the file.


Sure, just use TextInputFormat. You'll want to set the minimum split  
size (mapred.min.split.size) to a large number so that you get exactly  
1 map per an input file.



I *could* simply uppercase the text lines and write them to an output
file directly in the mapper (and not use any reducers).  So, there's a
question: is it considered bad practice to write output files directly
from mappers?


You could do it directly, but I would suggest that using the  
TextOutputFormat is easier.


Your map should just do:
  collect(null, upperCaseLine);

Assuming that number of reduces is 0, the output of the map goes  
straight to the OutputCollector.



Assuming it's advisable in this example to write a file directly in
the mapper - how should the mapper create a unique output partition
file name?  Is there a way for a mapper to know its index in the total
# of mappers?


Get mapred.task.partition from the configuration.


Assuming it's inadvisable to write a file directly in the mapper - I
can output the records to the reducers using the same key and using
the uppercased data as the value.  Then, in my reducer, should I write
a file?  Or should I collect() the records in the reducers and let
hadoop write the output?


See above, but with no reduces the data is not sorted. If you pass a  
null or NullWritable to the TextOutputFormat, it will not add the tab.


-- Owen


Re: Suggestions of proper usage of key parameter ?

2008-12-14 Thread Owen O'Malley


On Dec 14, 2008, at 4:47 PM, Ricky Ho wrote:

Yes, I am referring to the key INPUT INTO the map() function and  
the key EMITTED FROM the reduce() function.  Can someone explain  
why do we need a key in these cases and what is the proper use of  
it ?


It was a design choice and could have been done as:

R1 - map - K,V - reduce - R2

instead of

K1,V1 - map - K2,V2 - reduce - K3,V3

but since the input of the reduce is sorted on K2, the output of the  
reduce is also typically sorted and therefore keyed. Since jobs are  
often chained together, it makes sense to make the reduce input match  
the map input. Of course everything you could do with the first option  
is possible with the second using either K1 = R1 or V1 = R1. Having  
the keys is often convenient...


Who determines what the key should be ?  (by the corresponding  
InputFormat implementation class) ?


The InputFormat makes the choice.

In this case, what is the key in the map() call ?  (name of the  
input file) ?


TextInputFormat uses the byte offset as the key and the line as the  
value.


What if the reduce() function emits multiple key, value entries or  
not emitting any entry at all ?  Is this considered OK ?


Yes.

What if the reduce() function emits a key, value entry whose key  
is not the same as the input key parameter to the reduce()  
function ?  Is this OK ?


Yes, although the reduce output is not re-sorted, so the results won't  
be sorted unless K3 is a subset of K2.


If there is a two Map/Reduce cycle chained together.  Is the key  
input into the 2nd round map() function determined by the key  
emitted from the 1st round reduce() function ?


Yes.

-- Owen


Re: Reset hadoop servers

2008-12-09 Thread Owen O'Malley


On Dec 9, 2008, at 2:22 AM, Devaraj Das wrote:


I know that the tasktracker/jobtracker doesn't have any command for
re-reading the configuration. There is built-in support for
restart/shut-down but those are via external scripts that internally  
do a

kill/start.


Actually, HADOOP-4348 does add a refresh command to map/reduce. I'm  
not sure if it re-reads the entire config or just part of it.


-- Owen


Re: End of block/file for Map

2008-12-09 Thread Owen O'Malley


On Dec 9, 2008, at 11:35 AM, Songting Chen wrote:


Is there a way for the Map process to know it's the end of records?

I need to flush some additional data at the end of the Map process,  
but wondering where I should put that code.


The close() method is called at the end of the map.

-- Owen


Re: End of block/file for Map

2008-12-09 Thread Owen O'Malley


On Dec 9, 2008, at 7:34 PM, Aaron Kimball wrote:


That's true, but you should be aware that you no longer have an
OutputCollector available in the close() method.


True, but in practice you can keep a handle to it from the map method  
and it will work perfectly. This is required for both streaming and  
pipes to work. (Both of them do their processing asynchronously, so  
the close needs to wait for the subprocess to finish. Because of this,  
the contract with the Mapper and Reducer are very loose and the  
collect method may be called in between calls to the map method.)  In  
the context object api (hadoop-1230), the api will include the context  
object in cleanup, to make it clear that cleanup can also write records.


-- Owen


Re: How are records with equal key sorted in hadoop-0.18?

2008-12-08 Thread Owen O'Malley

On Dec 8, 2008, at 8:02 AM, Christian Kunz wrote:


Comparing hadoop-default.xml of hadoop-0.18 with hadoop-0.17, didn't
map.sort.class change from
org.apache.hadoop.mapred.MergeSorter to
org.apache.hadoop.util.QuickSort?


Yes, but the quick sort is only used in the mapper. The reducer  
already has sorted runs and therefore only needs an external merge sort.


The primary change in 0.18 for the reducer was HADOOP-2095. What were  
the values of io.sort.factor and io.file.buffer.size?


Christian, can you get the heap profile for one of the reduces that is  
failing?


mapred.task.profile=true
mapred.task.profile.maps= empty string, since we don't want any maps
mapred.task.profile.reduces= number of reduce to profile
mapred.task.profile.params=- 
agentlib:hprof=heap=sites,force=n,thread=y,verbose=n,file=%s


-- Owen


Re: getting Configuration object in mapper

2008-12-05 Thread Owen O'Malley


On Dec 4, 2008, at 9:19 PM, abhinit wrote:


I have set some variable using the JobConf object.

jobConf.set(Operator, operator) etc.

How can I get an instance of Configuration object/ JobConf object  
inside

a map method so that I can retrieve these variables.


In your Mapper class, implement a method like:
 public void configure(JobConf job) { ... }

This will be called when the object is created with the job conf.

-- Owen

Re: Hadoop Internal Architecture writeup

2008-11-28 Thread Owen O'Malley


On Nov 28, 2008, at 9:45 AM, Ricky Ho wrote:

[Ricky]  What exactly does the job.split contains ?  I assume it  
contains the specification for each split (but not its data), such  
as what is the corresponding file and the byte range within that  
file.  Correct ?


Yes

[Ricky]  I am curious about why can't the reduce execution start  
earlier (before all the map tasks completed).


The contract is that each reduce is given the keys in sorted order.  
The reduce can't start until it is sure it has the first key. That can  
only happen after the maps are all finished.


[Ricky]  Do you mean if the job has 5000 splits, then it requires  
5000 TaskTrackers VM (one for each split) ?


In 0.19, you can enable the framework to re-use jvms between tasks in  
the same job. Look at HADOOP-249.



[Ricky]  Is this a well-know folder within the HDFS ?


It is configured by the cluster admin. However, applications should  
*not* depend on the contents or even visibility of that directory. It  
will almost certainly become inaccessible to clients as part of  
increasing security.


-- Owen


Re: How to let Reducer know on which partition it is working

2008-11-26 Thread Owen O'Malley


On Nov 26, 2008, at 4:35 AM, Jürgen Broß wrote:
 I'm not sure how to let a Reducer know in its configure() method  
which partition it will get from the Partitioner,


From:

http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html#Task+JVM+Reuse

look for mapred.task.partition, which is a number from 0 to # reduces  
- 1.


-- Owen

Re: Block placement in HDFS

2008-11-24 Thread Owen O'Malley


On Nov 24, 2008, at 8:44 PM, Mahadev Konar wrote:


Hi Dennis,
 I don't think that is possible to do.


No, it is not possible.


 The block placement is determined
by HDFS internally (which is local, rack local and off rack).


Actually, it was changed in 0.17 or so to be node-local, off-rack, and  
a second node off rack.


-- Owen


Re: A question about the combiner, reducer and the Output value class: can they be different?

2008-11-16 Thread Owen O'Malley
On Sun, Nov 16, 2008 at 2:18 PM, Saptarshi Guha [EMAIL PROTECTED]wrote:

 Hello,
If my understanding is correct, the combiner will read in values for
 a given key, process it, output it and then **all** values for a key are
 given to the reducer.


Not quite. The flow looks like RecordReader - Mapper - Combiner * -
Reducer - OutputFormat .

The Combiner may be called 0, 1, or many times on each key between the
mapper and reducer. Combiners are just an application specific optimization
that compress the intermediate output. They should not have side effects or
transform the types. Unfortunately, since there isn't a separate interface
for Combiners, there is isn't a great place to document this requirement.
I've just filed HADOOP-4668 to improve the documentation.


   Then it ought to be possible for the combiner to be of the form
  ... ReducerIntWritable, Text, IntWritable, BytesWritable
and the reducer:
  ...ReducerIntWritable, BytesWritable, IntWritable, Text


Since the combiner may be called an arbitrary number of times, it must have
the same input and output types. So the parts generically look like:

input: InputFormatK1,V1
mapper: MapperK1,V1,K2,V2
combiner: ReducerK2,V2,K2,V2
reducer: ReducerK2,V2,K3,V3
output: RecordWriterK3,V3

so you probably need to move the code that was changing the type into the
last setp of the mapper.

-- Owen


Re: Writing to multiple output channels

2008-11-13 Thread Owen O'Malley

On Nov 13, 2008, at 9:38 PM, Sunil Jagadish wrote:

I have a mapper which needs to write output into two different kinds  
of

files (output.collect()).
For my purpose, I do not need any reducers.


Set the number of reduces to 0.
Open a sequence file in the mapper and write the second stream to it.
You'll end up with two files per a mapper.

-- Owen


Re: reduce more than one way

2008-11-09 Thread Owen O'Malley


On Nov 7, 2008, at 12:35 PM, Elia Mazzawi wrote:

I have 2 hadooop map/reduce programs that have the same map, but a  
different reduce methods.


can i run them in a way so that the map only happens once?


If the input to the reduces is the same, you can put the two reduces  
together and use one of the multiple output libraries. That will let  
your reducer produce two different output directories.


-- Owen


Re: Hadoop Design Question

2008-11-06 Thread Owen O'Malley

On Nov 6, 2008, at 11:29 AM, Ricky Ho wrote:


Disk I/O overhead
==
- The output of a Map task is written to a local disk and then later  
on upload to the Reduce task.  While this enable a simple recovery  
strategy when the map task failed, it incur additional disk I/O  
overhead.


That is correct. However, Linux does very well at using extra ram for  
buffer caches, so as long as you enable write behind it won't be a  
performance problem. You are right that the primary motivation is both  
recoverability and not needing the reduces running until after maps  
finish.


  So I am wondering if there is an option to bypassing the step of  
writing the map result to the local disk.


Currently no.

- In the current setting, it sounds like no reduce task will be  
started before all map tasks have completed.  In case if there are a  
few slow running map tasks, the whole job will be delayed.


The application's reduce function can't start until the last map  
finishes because the input to the reduce is sorted. Since the last map  
may generate the first keys that must be given to the reduce, the  
reduce must wait.


- The overall job execution can be shortened if the reduce tasks can  
starts its processing as soon as some map results are available  
rather than waiting for all the map tasks to complete.


But it would violate the specification of the framework that the input  
to reduce is completely sorted.


- Therefore it is impossible for the reduce phase of Job1 to stream  
its output data to a file while the map phase of Job2 start reading  
the same file.  Job2 can only start after ALL REDUCE TASKS of Job1  
is completed, which makes pipelining between jobs impossible.


It is currently not supported, but the framework could be extended to  
let the client add input splits after the job has started. That would  
remove the hard synchronization between jobs.


- This means the partitioning function has to be chosen carefully to  
make sure the workload of the reduce processes is balanced.  (maybe  
not a big deal)


Yes, the partitioner must balance the workload between the reduces.

- Is there any thoughts of running a pool of reduce tasks on the  
same key and have they combine their results later ?


That is called the combiner. It is called multiple times as the data  
is merged together. See the word count example. If the reducer does  
data reduction, using combiners is very important for performance.


-- Owen


Re: More Hadoop Design Question

2008-11-06 Thread Owen O'Malley


On Nov 6, 2008, at 2:30 PM, Ricky Ho wrote:

Hmmm, sounds like the combiner is invoked after the map() process  
completed for the file split.


No. The data path is complex, but the combiner is called when the map  
outputs are being spilled to disk. So roughly, the map will output  
key, value pairs until the io.sort.mb buffer is full, the contents are  
sorted and fed to the combiner. The output of the combiner is written  
to disk. When there are enough spills on disk, it will merge them  
together, call the combiner, and write to disk. When the map finishes,  
the final multi-level merge is done.


Since the reduce is also doing multi-level sort, it will also call the  
combiner when a merge is done (other than the final merge, which is  
fed into the reduce).


That means, before the combiner function starts, all the  
intermediate map() output result will be kept in memory ?  Any  
comment on the memory footprint consumption ?


The memory is bound by io.sort.mb.

 I think a sufficient condition is just to make sure the reduce task  
will not COMPLETE before all the map tasks has completed.  We don't  
need to make sure the reduce task will not START before all maps  
tasks has completed.  This can be achieved easily by letting the  
iterator.next() call within the reduce() method blocked.


*Sigh* no. The reduce function is invoked once per a unique key. The  
reduce function is called in ascending order of keys. Since the final  
map may return a's when previously you've only seen b's and c's. You  
can't call the reduce with the b, you can't later call it with the a.


There is another potential issue in the reduce() API, can you  
explain why do we need to expose the OutputCollector to the reduce()  
method ?  For example, is it possible that the key in the  
output.collect() be a different key from the reduce method  
parameter ?  What happen if two reduce method (start with different  
keys) writing their output on the same key ?


The reduce is allowed to have different input and output types. There  
are *four* type parameters.


ReducerKeyIn, ValueIn, KeyOut, ValueOut

The output of the reduce is not resorted. If the reduce doesn't use  
the same key as the input, the output of the reduce won't be sorted.  
Duplicate keys on reduce output (either within the same reduce or  
different ones, is not a problem for the framework.)


However, this requires some change of the current Reducer  
interface.  Currently the reduce() method is called once per key.   
We want that to be called once per map result (within the same  
key).  What I mean is the following interface ...


There is a library that lets you run a chain of maps, if that is the  
semantics you are looking for. For map/reduce, the sort is a very  
fundamental piece. If you don't need sort between map and reduce, you  
can set reduces = 0 and run much faster.



Does it make sense ?


Not really. Most map/reduce applications need the other semantics.

-- Owen


Re: key,Values at Reducer are local or present in DFS

2008-11-05 Thread Owen O'Malley

On Nov 5, 2008, at 2:21 PM, Tarandeep Singh wrote:

I want to know whether the key,values received by a particular  
reducer at a

node are stored locally on that node or are stored on DFS (and hence
replicated over cluster according to replication factor set by user)


Map outputs (and reduce inputs) are stored on local disk and not HDFS.  
The data is moved between computers via http.



One more question-  How does framework replicates the data? Say Node A
writes a file, is it guaranteed that atleast one copy will be stored  
on node

A?


HDFS writes all of the replicas in parallel. For the most part, it  
writes to the local (same node) DataNode first, that DataNode sends it  
to a DataNode on another rack, and that DataNode sends it to a third  
DataNode on the other rack.


-- Owen


Re: Any Way to Skip Mapping?

2008-11-02 Thread Owen O'Malley
If you don't need a sort, which is what it sounds like, Hadoop  
supports that by turning off the reduce. That is done by setting the  
number of reduces to 0. This typically is much faster than if you need  
the sort. It also sounds like you may need/want the library that does  
map-side joins.

http://tinyurl.com/43j5pp

-- Owen


ApacheCon US 2008

2008-10-31 Thread Owen O'Malley
Just a reminder that ApacheCon US is next week in New Orleans. There  
will be a lot of Hadoop developers and talks. (I'm CC'ing core-user  
because it has the widest coverage. Please join the low traffic  
[EMAIL PROTECTED] list for cross sub-project announcements.)


* Hadoop Camp with lots of talks about Hadoop
  o Introduction to Hadoop by Owen O'Malley
  o A Tour of Apache Hadoop by Tom White
  o Programming Hadoop Map/Reduce by Arun Murthy
  o Hadoop at Yahoo! by Eric Baldeschwieler
  o Hadoop Futures Panel
  o Using Hadoop for an Intranet Seach Engine by Shivakumar  
Vaithyanthan

  o Cloud Computing Testbed by Thomas Sandholm
  o Improving Virtualization and Performance Tracing of  
Hadoop with Open Solaris

  by George Porter
  o An Insight into Hadoop Usage at Facebook by Dhruba  
Borthakur

  o Pig by Alan Gates
  o Zookeeper, Coordinating the Distributed Application by  
Ben Reed

  o Querying JSON Data on Hadoop using Jaql by Kevin Beyer
  o HBase by Michael Stack
* Hadoop training on Practical Problem Solving in Hadoop
* Cloudera is providing a test Hadoop cluster and a Hadoop  
hacking contest.


There is also a new Hadoop tutorial available.

-- Owen

Re: Mapper settings...

2008-10-31 Thread Owen O'Malley


On Oct 31, 2008, at 3:15 PM, Bhupesh Bansal wrote:


Why do we need these setters in JobConf ??

jobConf.setMapOutputKeyClass(String.class);

jobConf.setMapOutputValueClass(LongWritable.class);


Just historical. The Mapper and Reducer interfaces didn't use to be  
generic. (Hadoop used to run on Java 1.4 too...)


It would be nice to remove the need to call them. There is an old bug  
open to check for consistency HADOOP-1683. It would be even better to  
make the setting of both the map and reduce output types optional if  
they are specified by the template parameters.


-- Owen


Re: Is there a unique ID associated with each task?

2008-10-30 Thread Owen O'Malley


On Oct 30, 2008, at 9:03 AM, Joel Welling wrote:

 I'm writing a Hadoop Pipes application, and I need to generate a  
bunch
of integers that are unique across all map tasks.  If each map task  
has
a unique integer ID, I can make sure my integers are unique by  
including

that integer ID.  I have this theory that each map task has a unique
identifier associated with some configuration parameter, but I don't
know the name of that parameter.
 Is there an integer associated with each task?  If so, how do I get
it?  While we're at it, is there a way to get the total number of map
tasks?


There is a unique identifier for each task and even each task attempt.  
From:


http://hadoop.apache.org/core/docs/current/mapred_tutorial.html#Task+Execution+%26+Environment

mapred.tip.idThe task id
mapred.task.id  The task attempt id

Since you probably don't care about re-execution, you probably want  
mapred.tip.id.


If you just want the number of the mapred.tip.id as an integer, you  
can use mapred.task.partition, which will be a number from 0 to M -  
1 for maps or 0 to R -1 for reduces.


-- Owen


Re: I am attempting to use setOutputValueGroupingComparator as a secondary sort on the values

2008-10-29 Thread Owen O'Malley

I uploaded a patch that does a secondary sort. Take a look at:

https://issues.apache.org/jira/browse/HADOOP-4545

It reads input with two numbers per a line. Such as:

-1 -4
-3 23
5 10
-1 -2
-1 300
-1 10
4 1
4 2
4 10
4 -1
4 -10
10 20
10 30
10 25

And produces output like (with 2 reduces):

part-0:

4   -10
4   -1
4   1
4   2
4   10

10  20
10  25
10  30

part-1:

-3  23

-1  -4
-1  -2
-1  10
-1  300

5   10

-- Owen


Re: Understanding file splits

2008-10-28 Thread Owen O'Malley


On Oct 28, 2008, at 6:29 AM, Malcolm Matalka wrote:


I am trying to write an InputFormat and I am having some trouble
understanding how my data is being broken up.  My input is a previous
hadoop job and I have added code to my record reader to print out the
FileSplit's start and end position, as well as where the last record I
read was located.  My record are all about 100 bytes so fairly small.
For one file I am seeing the following output:



start: 0 end: 45101881 pos: 67108800

start: 45101880 end: 90203762 pos: 67108810

start: 90203761 end: 135305643 pos: 134217621

start: 135305642 end: 180170980 pos: 180170902


It would help if you printed the FIleSplits themselves, so that we can  
see the file names. I don't know where the pos is coming from.  
FileSplits only have offset and length.



Note, I have also specified in my InputFormat that isSplittable return
false.


That isn't working. Otherwise, you would only have FileSplits that  
start at 0.


I do not understand why there is overlap.  Note that on the second  
one,

I never appear to reach the end position.


The RecordReaders have to read more than their split because they can  
only process whole records. So, in the case of text files and  
TextInputFormat, the split is picked blindly. Then all of the splits  
that don't start at offset 0, read until they reach a newline. They  
start from there and read until the newline *past* the end of the  
split. That way all of the data is processed and no partial records  
are processed.


-- Owen


Re: I am attempting to use setOutputValueGroupingComparator as a secondary sort on the values

2008-10-28 Thread Owen O'Malley


On Oct 28, 2008, at 7:53 AM, David M. Coe wrote:

My mapper is MapperLongWritable, Text, IntWritable, IntWritable  
and my

reducer is the identity.  I configure the program using:

conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class);
conf.setReducerClass(IdentityReducer.class);

conf.setOutputKeyComparatorClass(IntWritable.Comparator.class);
conf.setOutputValueGroupingComparator(IntWritable.Comparator.class);


The problem is that your map needs to look like:

class IntPair implements Writable {
  private int left;
  private int right;
  public void set(int left, int right) { ... }
  public int getLeft() {...}
  public int getRight() {...}
}

your Mapper should be MapperLongWritable, Text, IntPair, IntWritable  
and should emit


IntPair key = new IntPair();
IntegerWritable value = new IntegerWritale();
...
key.set(keyValue, valueValue);
value.set(keyValue,);
output.collect(key, value);

Your sort comparator should take compare both left and right in the  
pair.

The grouping comparator should only look at left in the pair.

Your Reducer should be ReducerIntPair, IntWritable, IntWritable,  
IntWritable


output.collect(key.getLeft(), value);

Is that clearer?

-- Owen


Re: help: InputFormat problem ?

2008-10-27 Thread Owen O'Malley
If your application that you are drawing from is doing some sort of  
web crawl that connects to lots of random servers, you may want to use  
MultiThreadedMapRunner and do the remote connections in the map. If  
you are just connecting to a small set of servers for each map, you  
should put it in the InputFormat.


Using MultiThreadedMapRunner means that each map can have multiple  
threads transferring data from the external sources.


-- Owen


Re: Help: How to change number of mappers in Hadoop streaming?

2008-10-26 Thread Owen O'Malley


On Oct 26, 2008, at 8:38 AM, chaitanya krishna wrote:

I forgot to mention that although the number of map tasks are set in  
the

code as I mentioned before, the actual number of map tasks are not
essentially the same number but is very close to this number.


The number of reduces is precisely the one configured by the job. The  
number of maps depends on the InputFormat selected. For  
FileInputFormats, which include TextInputFormat and  
SequenceFileInputFormat, the formula is complicated, but it usually  
defaults to the greater of the number requested or the number of hdfs  
blocks in the input.


-- Owen


Re: Any one successfully ran the c++ pipes example?

2008-10-17 Thread Owen O'Malley


On Oct 16, 2008, at 1:40 PM, Zhengguo 'Mike' SUN wrote:

I was trying to write an application using the pipes api. But it  
seemed the serialization part is not working correctly. More  
specifically, I can't deserialize a string from an StringInStream  
constructed from context.getInputSplit(). Even with the examples  
bundled in the distribution archive(wordcount-nopipe.cc), it threw  
exceptions. If anyone had experience on that, please kindly give  
some advise.


So you mean the example with a C++ record reader? You have to use the  
InputFormat that generates input splits that consists of strings. Look  
at src/test/org/apache/hadoop/pipes/WordCountInputFormat.java


It would be useful to have a C++ impl of FileInputSplit too...

-- Owen


Re: Distributed cache Design

2008-10-16 Thread Owen O'Malley


On Oct 16, 2008, at 1:52 PM, Bhupesh Bansal wrote:


We at Linkedin are trying to run some Large Graph Analysis problems on
Hadoop. The fastest way to run would be to keep a copy of whole  
Graph in RAM
at all mappers. (Graph size is about 8G in RAM) we have cluster of 8- 
cores

machine with 8G on each.


The best way to deal with it is *not* to load the entire graph in one  
process. In the WebMap at Yahoo, we have a graph of the web that has  
roughly 1 trillion links and 100 billion nodes. See http://tinyurl.com/4fgok6 
 . To invert the links, you process the graph in pieces and resort  
based on the target. You'll get much better performance and scale to  
almost any size.



Whats is the best way of doing that ?? Is there a way so that multiple
mappers on same machine can access a RAM cache ??  I read about hadoop
distributed cache looks like it's copies the file (hdfs / http)  
locally on

the slaves but not necessrily in RAM ??


You could mmap the file from distributed cache using MappedByteBuffer.  
Then there will be one copy between jvms...


-- Owen


Re: Distributed cache Design

2008-10-16 Thread Owen O'Malley


On Oct 16, 2008, at 3:09 PM, Bhupesh Bansal wrote:

Lets say I want to implement a DFS in my graph. I am not able to  
picturise
implementing it with doing graph in pieces without putting a depth  
bound to

(3-4). Lets say we have 200M (4GB) edges to start with


Start by watching the lecture on graph algorithms in map/reduce:

http://www.youtube.com/watch?v=BT-piFBP4fE

And see if that makes it clearer. If not, ask more questions. *smile*

-- Owen


Re: Seeking examples beyond word count

2008-10-14 Thread Owen O'Malley


On Oct 14, 2008, at 8:37 PM, Bert Schmidt wrote:

I'm trying to think of how I might use it yet all the examples I  
find are variations of word count.


Look in the src/examples directory.
  PiEstimator - estimates the value of pi using distributed brute force
  Pentomino - solves Pentomino tile placement problems including one  
sided variants
  Terasort - tools to generate the required data, sort it into a  
total order, and verify the sort order


There is also distcp in src/tools that uses map/reduce to copy a lot  
of files between clusters.


Are there any interesting examples of how people are using it for  
real tasks?


A final pointer would be to Nutch, that uses Hadoop for distribution.

-- Owen



Re: Sharing an object across mappers

2008-10-03 Thread Owen O'Malley


On Oct 3, 2008, at 7:49 AM, Devajyoti Sarkar wrote:

Briefly going through the DistributedCache information, it seems to  
be a way

to distribute files to mappers/reducers.


Sure, but it handles the distribution problem for you.


One still needs to read the
contents into each map/reduce task VM.


If the data is straight binary data, you could just mmap it from the  
various tasks. It would be pretty efficient.


The other direction is to use the MultiThreadedMapRunner and run  
multiple maps as threads in the same VM. But unless your maps are CPU  
heavy or contacting external servers, it probably won't help as much  
as you'd like.


-- Owen


  1   2   >