Re: Pregel
On Jun 25, 2009, at 9:42 PM, Mark Kerzner wrote: my guess, as good as anybody's, is that Pregel is to large graphs is what Hadoop is to large datasets. I think it is much more likely a language that allows you to easily define fixed point algorithms. I would imagine a distributed version of something similar to Michal Young's GenSet. http://portal.acm.org/citation.cfm?doid=586094.586108 I've been trying to figure out how to justify working on a project like that for a couple of years, but haven't yet. (I have a background in program static analysis, so I've implemented similar stuff.) In other words, Pregel is the next natural step for massively scalable computations after Hadoop. I wonder if it uses map/reduce as a base or not. It would be easier to use map/reduce, but a direct implementation would be more performant. In either case, it is a new hammer. From what I see, it likely won't replace map/reduce, pig, or hive; but rather support a different class of applications much more directly than you can under map/reduce. -- Owen
Re: A simple performance benchmark for Hadoop, Hive and Pig
On Thu, Jun 18, 2009 at 9:29 PM, Zheng Shao zs...@facebook.com wrote: Yuntao Jia, our intern this summer, did a simple performance benchmark for Hadoop, Hive and Pig based on the queries in the SIGMOD 2009 paper: A Comparison of Approaches to Large-Scale Data Analysis It should be noted that no one on the Pig team was involved in setting up the benchmarks and the queries don't follow the Pig cookbook suggestions for writing efficient queries, so these results should be considered *extremely* preliminary. Furthermore, I can't see any way that Hive should be able to beat raw map/reduce, since Hive uses map/reduce to run the job. In the future, it would be better to involve the respective communities (mapreduce-dev and pig-dev) far before pushing benchmark results out to the user lists. The Hadoop project, which includes all three subprojects, needs to be a cooperative community that is trying to build the best software we can. Getting benchmark numbers is good, but are better done in a collaborative manner. -- Owen
Re: Practical limit on emitted map/reduce values
Keys and values can be large. They are certainly capped above by Java's 2GB limit on byte arrays. More practically, you will have problems running out of memory with keys or values of 100 MB. There is no restriction that a key/value pair fits in a single hdfs block, but performance would suffer. (In particular, the FileInputFormats split at block sized chunks, which means you will have maps that scan an entire block without processing anything.) -- Owen
Re: Practical limit on emitted map/reduce values
On Jun 18, 2009, at 8:45 AM, Leon Mergen wrote: Could you perhaps elaborate on that 100 MB limit ? Is that due to a limit that is caused by the Java VM heap size ? If so, could that, for example, be increased to 512MB by setting mapred.child.java.opts to '-Xmx512m' ? A couple of points: 1. The 100MB was just for ballpark calculations. Of course if you have a large heap, you can fit larger values. Don't forget that the framework is allocating big chunks of the heap for its own buffers, when figuring out how big to make your heaps. 2. Having large keys is much harder than large values. When doing a N-way merge, the framework has N+1 keys and 1 value in memory at a time. -- Owen
Re: multiple file input
On Jun 18, 2009, at 10:56 AM, pmg wrote: Each line from FileA gets compared with every line from FileB1, FileB2 etc. etc. FileB1, FileB2 etc. are in a different input directory In the general case, I'd define an InputFormat that takes two directories, computes the input splits for each directory and generates a new list of InputSplits that is the cross-product of the two lists. So instead of FileSplit, it would use a FileSplitPair that gives the FileSplit for dir1 and the FileSplit for dir2 and the record reader would return a TextPair with left and right records (ie. lines). Clearly, you read the first line of split1 and cross it by each line from split2, then move to the second line of split1 and process each line from split2, etc. You'll need to ensure that you don't overwhelm the system with either too many input splits (ie. maps). Also don't forget that N^2/M grows much faster with the size of the input (N) than the M machines can handle in a fixed amount of time. Two input directories 1. input1 directory with a single file of 600K records - FileA 2. input2 directory segmented into different files with 2Million records - FileB1, FileB2 etc. In this particular case, it would be right to load all of FileA into memory and process the chunks of FileB/part-*. Then it would be much faster than needing to re-read the file over and over again, but otherwise it would be the same. -- Owen
Re: Anyway to sort keys before Reduce function in Hadoop ?
On Wed, Jun 17, 2009 at 12:26 PM, Chuck Lam chuck@gmail.com wrote: an alternative is to create a new WritableComparator and then set it in the JobConf object with the method setOutputKeyComparatorClass(). You can use IntWritable.Comparator as a start. The important part of that is to define a RawComparator for your key class and call JobConf.setOutputKeyComparatorClass with it.So if you wanted to invert the default sort order of IntWritable keys, you could: public class InvertedIntWritableComparator extends IntWritable.Comparator { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -1 * super.compare(b1,s1,l1,b2,s2,l2); } } then job.setOutputKeyComparatorClass(InvertedIntWritableComparator.class); -- Owen
Re: MapContext.getInputSplit() returns nothing
Sorry, I forget how much isn't clear to people who are just starting. FileInputFormat creates FileSplits. The serialization is very stable and can't be changed without breaking things. The reason that pipes can't stringify it is that the string form of input splits are ambiguous (and since it is user code, we really can't make assumptions about it). The format of FileSplit is: 16 bit filename byte length filename in bytes 64 bit offset 64 bit length Technically the filename uses a funky utf-8 encoding, but in practice as long as the filename has ascii characters they are ascii. Look at org.apache.hadoop.io.UTF.writeString for the precise definition. -- Owen
Re: MapContext.getInputSplit() returns nothing
*Sigh* We need Avro for input splits. That is the expected behavior. It would be great if someone wrote a C+ + FileInputSplit class that took a binary string and converted it back to a filename, offset, and length. -- Owen
Re: speedy Google Maps driving directions like implementation
On Jun 10, 2009, at 2:43 AM, Lukáš Vlček wrote: I am wondering how Google implemented the driving directions function in the Maps. More specifically how did they do it that it is so fast. I asked on Google engineer about this and all he told me is just that there are bunch of MapReduce cycles involved in this process Like search, the key is to use batch map/reduce jobs to generate the indexes that are used to answer the query in real-time. The right question is what kind of indexes can be used to answer the routing question quickly. Generalizing back to a graph, you could use map/ reduce jobs to generate the all-pairs-shortest-distance matrix. Then it is easy to use the matrix to solve arbitrary shortest path problems in linear time. -- Owen
Re: Hadoop benchmarking
Take a look at Arun's slide deck on Hadoop performance: http://bit.ly/EDCg3 It is important to get io.sort.mb large enough, the io.sort.factor should be closer to 100 instead of 10. I'd also use large block sizes to reduce the number of maps. Please see the deck for other important factors. -- Owen
Re: Image indexing/searching with Hadoop and MPI
Ok I can understand your point - but I am sure that some people have been trying to use map-reduce programming model to do CFD, or any other scientific computing. Any experience in this area from the list ? I know of one project that assumes it has an entire Hadoop cluster, and generates the hostnames in the Mapper and uses those host lists in the Reducer to launch an MPI job. They do it because it provides a higher efficiency for doing very small data transfers. The alternative was doing a long chain of map/reduce jobs that have very small outputs from each phase. I wouldn't recommend using MPI under map/reduce in general, since it involves making a lot of assumptions about your application. In particular, to avoid from killing your cluster your shouldn't use checkpoints in your application and just rerun the application from the beginning on failures. That implies that the application can't run very long (upper bound of probably 30 minutes on 2000 nodes). That said, if you want to run other styles of applications, you really want a two level scheduler. Where the first level scheduler allocates nodes (or partial nodes) to jobs (or frameworks). Effectively, that is what Hadoop On Demand (HOD) was doing with Torque, but I suspect there will be a more performant solution than HOD with in the next year. -- Owen
Re: Fastlz coming?
On Jun 4, 2009, at 11:19 AM, Kris Jirapinyo wrote: Hopefully we can have a new page on the hadoop wiki on how to use custom compression so that people won't have to go search through the threads to find the answer in the future. Yes, it would be extremely useful if you could start a wiki page with your discoveries. Since you had to discover it, it will be clear to you what needs to be documented. On a similar note, I always find that I write the best comments in code after I leave a section for 6 months and have to figure out what I meant when I wrote it. Otherwise, it is too obvious. *smile* -- Owen
Re: InputFormat for fixed-width records?
On May 28, 2009, at 5:15 AM, Stuart White wrote: I need to process a dataset that contains text records of fixed length in bytes. For example, each record may be 100 bytes in length The update to the terasort example has an InputFormat that does exactly that. The key is 10 bytes and the value is the next 90 bytes. It is pretty easy to write, but I should upload it soon. The output types are Text, but they just have the binary data in them. -- Owen
Re: Linking against Hive in Hadoop development tree
On May 20, 2009, at 3:07 AM, Tom White wrote: Why does mapred depend on hdfs? MapReduce should only depend on the FileSystem interface, shouldn't it? Yes, I should have been consistent. In terms of compile-time dependences, mapred only depends on core. -- Owen
Re: Linking against Hive in Hadoop development tree
On May 15, 2009, at 3:25 PM, Aaron Kimball wrote: Yikes. So part of sqoop would wind up in one source repository, and part in another? This makes my head hurt a bit. I'd say rather that Sqoop is in Mapred and the adapter to Hive is in Hive. I'm also not convinced how that helps. Clearly, what you need to arrange is to not have a compile time dependence on Hive. Clearly we don't want cycles in the dependence tree, so you need to figure out how to make the adapter for Hive a plugin rather than a part of the Sqoop core. -- Owen
Re: Beware sun's jvm version 1.6.0_05-b13 on linux
On May 18, 2009, at 3:42 AM, Steve Loughran wrote: Presumably its one of those hard-to-reproduce race conditions that only surfaces under load on a big cluster so is hard to replicate in a unit test, right? Yes. It reliably happens on a 100TB or larger sort, but almost never happens on a small scale. -- Owen
Beware sun's jvm version 1.6.0_05-b13 on linux
We have observed that the default jvm on RedHat 5 can cause significant data corruption in the map/reduce shuffle for those using Hadoop 0.20. In particular, the guilty jvm is: java version 1.6.0_05 Java(TM) SE Runtime Environment (build 1.6.0_05-b13) Java HotSpot(TM) Server VM (build 10.0-b19, mixed mode) By upgrading to jvm build 1.6.0_13-b03, we fixed the problem. The observed behavior is that Jetty serves up random bytes from other transfers. In particular, some of them were valid transfers to the wrong reduce. We suspect the relevant java bug is: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 We have also filed a bug on Hadoop to add sanity checks on the shuffle that will work around the problem: https://issues.apache.org/jira/browse/HADOOP-5783 -- Owen
Re: Linking against Hive in Hadoop development tree
On May 15, 2009, at 2:05 PM, Aaron Kimball wrote: In either case, there's a dependency there. You need to split it so that there are no cycles in the dependency tree. In the short term it looks like: avro: core: avro hdfs: core mapred: hdfs, core hive: mapred, core pig: mapred, core Adding a dependence from core to hive would be bad. To integrate with Hive, you need to add a contrib module to Hive that adds it. -- Owen
Re: Access counters from within Reducer#configure() ?
On May 11, 2009, at 8:48 AM, Stuart White wrote: I'd like to be able to access my job's counters from within my Reducer's configure() method (so I can know how many records were output from my mappers). Is this possible? Thanks! The counters aren't available and if they were the information you want isn't there. Counters only flow up to the JobTracker. They never flow down to the task attempts, so each task only sees its own counters. -- Owen
Re: Is it possible to sort intermediate values and final values?
On May 7, 2009, at 12:38 PM, Foss User wrote: Where can I find this example. I was not able to find it in the src/examples directory. It is in 0.20. http://svn.apache.org/repos/asf/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SecondarySort.java -- Owen
Re: What User Accounts Do People Use For Team Dev?
Best is to use one user for map/reduce and another for hdfs. Neither of them should be root or real users. With the setuid patch (HADOOP-4490), it is possible to run the jobs as the submitted user. Note that if you do that, you no doubt want to block certain system uids (bin, mysql, etc.) -- Owen
Re: Implementing compareTo in user-written keys where one extends the other is error prone
If you use custom key types, you really should be defining a RawComparator. It will perform much much better. -- Owen
Re: core-user Digest 23 Apr 2009 02:09:48 -0000 Issue 887
On Apr 22, 2009, at 10:44 PM, Koji Noguchi wrote: Nigel, When you have time, could you release 0.18.4 that contains some of the patches that make our clusters 'stable'? Is it just the patches that have already been applied to the 18 branch? Or are there more? -- Owen
Re: Is combiner and map in same JVM?
On Apr 14, 2009, at 11:10 AM, Saptarshi Guha wrote: Thanks. I am using 0.19, and to confirm, the map and combiner (in the map jvm) are run in *different* threads at the same time? And the change was actually made in 0.18. So since then, the combiner is called 0, 1, or many times on each key in both the mapper and the reducer. It is called in a separate thread from the base application in the map (in the reduce task, the combiner is only use during the shuffle). My native library is not thread safe, so I would have to implement locks. Aaron's email gave me hope(since the map and combiner would then be running sequentially), but this appears to make things complicated. Yes, you'll probably need locks around your code that isn't thread safe. -- Owen
Re: Multithreaded Reducer
On Apr 10, 2009, at 11:12 AM, Sagar Naik wrote: Hi, I would like to implement a Multi-threaded reducer. As per my understanding , the system does not have one coz we expect the output to be sorted. However, in my case I dont need the output sorted. You'd probably want to make a blocking concurrent queue of the key value pairs that are given to the reducer. Then have a pool of reducers that pull from the queue. It can be modeled on the multi- threaded map runner. Do be aware, that you'll need to clone the keys and values that are given to the reduce. -- Owen
Re: HDFS read/write speeds, and read optimization
On Thu, Apr 9, 2009 at 9:30 PM, Brian Bockelman bbock...@cse.unl.edu wrote: On Apr 9, 2009, at 5:45 PM, Stas Oskin wrote: Hi. I have 2 questions about HDFS performance: 1) How fast are the read and write operations over network, in Mbps per second? Depends. What hardware? How much hardware? Is the cluster under load? What does your I/O load look like? As a rule of thumb, you'll probably expect very close to hardware speed. For comparison, on a 1400 node cluster, I can checksum 100 TB in around 10 minutes, which means I'm seeing read averages of roughly 166 GB/sec. For writes with replication of 3, I see roughly 40-50 minutes to write 100TB, so roughly 33 GB/sec average. Of course the peaks are much higher. Each node has 4 SATA disks, dual quad core, and 8 GB of ram. -- Owen
Re: HDFS read/write speeds, and read optimization
On Apr 10, 2009, at 9:07 AM, Stas Oskin wrote: From your experience, how RAM hungry HDFS is? Meaning, additional 4GB or ram (to make it 8GB aas in your case), really change anything? I don't think the 4 to 8GB would matter much for HDFS. For Map/Reduce, it is very important. -- Owen
Re: How many people is using Hadoop Streaming ?
On Apr 7, 2009, at 11:41 AM, Aaron Kimball wrote: Owen, Is binary streaming actually readily available? https://issues.apache.org/jira/browse/HADOOP-1722
Re: problem running on a cluster of mixed hardware due to Incompatible buildVersion of JobTracker adn TaskTracker
This was discussed over on: https://issues.apache.org/jira/browse/HADOOP-5203 Doug uploaded a patch, but no one seems to be working on it. -- Owen
Re: connecting two clusters
On Apr 6, 2009, at 9:49 PM, Mithila Nagendra wrote: Hey all I'm trying to connect two separate Hadoop clusters. Is it possible to do so? I need data to be shuttled back and forth between the two clusters. Any suggestions? You should use hadoop distcp. It is a map/reduce program that copies data, typically from one cluster to another. If you have the hftp interface enabled, you can use that to copy between hdfs clusters that are different versions. hadoop distcp hftp://namenode1:1234/foo/bar hdfs://foo/bar -- Owen
Re: best practice: mapred.local vs dfs drives
We always share the drives. -- Owen On Apr 5, 2009, at 0:52, zsongbo zson...@gmail.com wrote: I usually set mapred.local.dir to share the disk space with DFS, since some mapreduce job need big temp space. On Fri, Apr 3, 2009 at 8:36 PM, Craig Macdonald cra...@dcs.gla.ac.ukwrote: Hello all, Following recent hardware discussions, I thought I'd ask a related question. Our cluster nodes have 3 drives: 1x 160GB system/scratch and 2x 500GB DFS drives. The 160GB system drive is partitioned such that 100GB is for job mapred.local space. However, we find that for our application, mapred.local free space for map output space is the limiting parameter on the number of reducers we can have (our application prefers less reducers). How do people normally work for dfs vs mapred.local space. Do you (a) share the DFS drives with the task tracker temporary files, Or do you (b) keep them on separate partitions or drives? We originally went with (b) because it prevented a run-away job from eating all the DFS space on the machine, however, I'm beginning to realise the disadvantages. Any comments? Thanks Craig
Re: Users are not properly authenticated in Hadoop
On Apr 5, 2009, at 3:30 AM, Foss User wrote: However, I see that anyone can easily impersonate as fossist Yes, it is easy to work around the security in Hadoop. It is only intended to prevent accidents, such as the time a student accidently wiped out the entire class' home directories. If not, I have to go for other measures like firewalls, etc. But if something is available in Hadoop itself, it would be great. You currently need firewalls to protect Hadoop. There is work underway to add real authentication to Hadoop, which enable real security. -- Owen
Re: How many people is using Hadoop Streaming ?
On Apr 3, 2009, at 9:42 AM, Ricky Ho wrote: Has anyone benchmark the performance difference of using Hadoop ? 1) Java vs C++ 2) Java vs Streaming Yes, a while ago. When I tested it using sort, Java and C++ were roughly equal and streaming was 10-20% slower. Most of the cost with streaming came from the stringification. 1) I can pick the language that offers a different programming paradigm (e.g. I may choose functional language, or logic programming if they suit the problem better). In fact, I can even chosen Erlang at the map() and Prolog at the reduce(). Mix and match can optimize me more. 2) I can pick the language that I am familiar with, or one that I like. 3) Easy to switch to another language in a fine-grain incremental way if I choose to do so in future. Additionally, the interface to streaming is very stable. *smile* It also supports legacy applications well. The downsides are that: 1. The interface is very thin and has minimal functionality. 2. Streaming combiners don't work very well. Many streaming applications buffer in the map and run the combiner internally. 3. Streaming doesn't group the values in the reducer. In Java or C+ +, you get: key1, (value1, value2, ...) key2, (value3, ...) in streaming you get key1 value1 key1 value2 key2 value3 and your application needs to detect the key changes. 4. Binary data support has only recently been added to streaming. Am I missing something here ? or is the majority of Hadoop applications written in Hadoop Streaming ? On Yahoo's research clusters, typically 1/3 of the applications are streaming, 1/3 pig, and 1/3 java. -- Owen
Re: How many people is using Hadoop Streaming ?
On Apr 3, 2009, at 10:35 AM, Ricky Ho wrote: I assume that the key is still sorted, right ? That mean I will get all the key1, valueX entries before getting any of the key2 valueY entries and key2 is always bigger than key1. Yes. -- Owen
Re: Hadoop/HDFS for scientific simulation output data analysis
On Apr 3, 2009, at 1:41 PM, Tu, Tiankai wrote: By the way, what is the largest size---in terms of total bytes, number of files, and number of nodes---in your applications? Thanks. The largest Hadoop application that has been documented is the Yahoo Webmap. 10,000 cores 500 TB shuffle 300 TB compressed final output http://developer.yahoo.net/blogs/hadoop/2008/02/yahoo-worlds-largest-production-hadoop.html -- Owen
Re: datanode but not tasktracker
On Apr 1, 2009, at 12:58 AM, Sandhya E wrote: Hi When the host is listed in slaves file both DataNode and TaskTracker are started on that host. Is there a way in which we can configure a node to be datanode and not tasktracker. If you use hadoop-daemons.sh, you can pass a host list. So do: ssh namenode hadoop-daemon.sh start namenode ssh jobtracker hadoop-daemon.sh start jobtracker hadoop-daemons.sh -hosts dfs.slaves start datanode hadoop-daemons.hs -hosts mapred.slaves start tasktracker -- Owen
Re: Reduce doesn't start until map finishes
What happened is that we added fast start (HADOOP-3136), which launches more than one task per a heartbeat. Previously, if you maps didn't take very long, they finished before the heartbeat and the task tracker was assigned a new map task. A side effect was that no reduce tasks were launched until the maps were complete, which prevents the shuffle from overlapping with the maps. -- Owen
ApacheCon EU 2009 this week
ApacheCon EU 2009 is in Amsterdam this week, with a lot of talks on Hadoop. There are also going to be a lot of the committers there, including Doug Cutting. There is no word yet whether he is bringing the original Hadoop as seen in the NY Times. This year the live video streaming includes the Hadoop track. The video of the keynote and lunch talks are free, but there is a charge for the Hadoop track. The Hadoop track this year includes: * Opening Keynote - Data Management in the Cloud - Raghu Ramakrishnan * Introduction to Hadoop - Owen O'Malley * Hadoop Map-Reduce: Tuning and Debugging - Arun Murthy * Pig: Making Hadoop Easy - Olga Natkovich * Running Hadoop in the Cloud - Tom White * Configuring Hadoop for Grid Services - Allen Wittenauer * Dynamic Hadoop Clusters - Steve Loughran -- Owen
Re: Sorting data numerically
On Mar 23, 2009, at 9:15 AM, tim robertson wrote: If Akira was to write his/her own Mappers, using types like IntWritable would result in it being numerically sorted right? Yes. Or they can use the KeyFieldBasedComparator. I think if you put the following in your job conf, you'll get the right behavior. mapred.output.key.comparator.class = org.apache.hadoop.mapred.lib.KeyFieldBasedComparator mapred.text.key.comparator.options = -n -- Owen
Re: Coordination between Mapper tasks
On Mar 18, 2009, at 10:26 AM, Stuart White wrote: I'd like to implement some coordination between Mapper tasks running on the same node. I was thinking of using ZooKeeper to provide this coordination. This is a very bad idea in the general case. It can be made to work, but you need to have a dedicated cluster so that you are sure they are all active at the same time. Otherwise, you have no guarantee that all of the maps are running at the same time. In most cases, you are much better off using the standard communication between the maps and reduces and making multiple passes of jobs. I think I remember hearing that MapReduce and/or HDFS use ZooKeeper under-the-covers. There are no immediate plans to implement HA yet. -- Owen
Re: Numbers of mappers and reducers
On Mar 17, 2009, at 9:18 AM, Richa Khandelwal wrote: I was going through FAQs on Hadoop to optimize the performance of map/reduce. There is a suggestion to set the number of reducers to a prime number closest to the number of nodes and number of mappers a prime number closest to several times the number of nodes in the cluster. There is no need for the number of reduces to be prime. The only thing it helps is if you are using the HashPartitioner and your key's hash function is too linear. In practice, you usually want to use 99% of your reduce capacity of the cluster. -- Owen
Re: Release batched-up output records at end-of-job?
On Mar 17, 2009, at 5:53 AM, Stuart White wrote: Yeah, I thought of that, but I was concerned that, even if it did work, if it wasn't guaranteed behavior, that it might stop working in a future release. I'll go ahead and give that a try. No, it will continue to work. It is required for both streaming and pipes to work. Can anybody provide details on this new API? Sure, it is in the upcoming 0.20. I made the package org.apache.hadoop.mapreduce so that I could have the new API running in parallel with the old one. You can see the new API in subversion: http://svn.apache.org/repos/asf/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ -- Owen
Re: Massive discrepancies in job's bytes written/read
On Mar 17, 2009, at 7:44 PM, Bryan Duxbury wrote: There is no compression in the mix for us, so that's not the culprit. I'd be sort of willing to believe that spilling and sorting play a role in this, but, wow, over 10x read and write? That seems like a big problem. It happened recently to me too. It was off by 6x. The strange thing was that the individual tasks looked right. It was just the aggregate that was wrong. -- Owen
Re: Temporary files for mapppers and reducers
Just use the current working directory. Each task gets a unique directory that is erased when the task finished. -- Owen On Mar 15, 2009, at 16:08, Mark Kerzner markkerz...@gmail.com wrote: Hi, what would be the best place to put temporary files for a reducer? I believe that since reducers each work on its own machine, at its own time, one can do anything, but I would like a confirmation from the experts. Thanks, Mark
Re: null value output from map...
On Mar 13, 2009, at 3:56 PM, Richa Khandelwal wrote: You can initialize IntWritable with an empty constructor. IntWritable i=new IntWritable(); NullWritable is better for that application than IntWritable. It doesn't consume any space when serialized. *smile* -- Owen
Re: Does hadoop-default.xml + hadoop-site.xml matter for whole cluster or each node?
On Mar 7, 2009, at 10:56 PM, pavelkolo...@gmail.com wrote: Does hadoop-default.xml + hadoop-site.xml of master host matter for whole Job or they matter for each node independently? Please never modify hadoop-default. That is for the system defaults. Please use hadoop-site for your configuration. It depends on the property whether they come from the job's configuration or the system's. Some like io.sort.mb and mapred.map.tasks come from the job, while others like mapred.tasktracker.map.tasks.maximum come from the system. The job parameters come from the submitting client, while the system parameters need to be distributed to each worker node. -- Owen For example, if one of them (or both) contains: property namemapred.map.tasks/name value6/value /property then is it means that six mappers will be executed on all nodes or 6 on each node? That means that your job will default to 6 maps. mapred.tasktracker.map.tasks.maximum specifies the number of maps running on each node. And yes, we really should do a cleanup of the property names to do something like: mapred.job.* mapred.system.* to separate the job from the system parameters. -- Owen
Re: question about released version id
On Mar 2, 2009, at 11:46 PM, 鞠適存 wrote: I wonder how to make the hadoop version number. Each 0.18, 0.19 and 0.20 have their own branch. The first release on each branch is 0.X.0, and then 0.X.1 and so on. New features are only put into trunk and only important bug fixes are put into the branches. So there will be no new functionality going from 0.X.1 to 0.X.2, but there will be going from a release of 0.X to 0.X+1. -- Owen
Re: Reducer goes past 100% complete?
On Mar 9, 2009, at 1:00 PM, james warren wrote: Speculative execution has existed far before 0.19.x, but AFAIK the 100% issue has appeared (at least with greater frequency) since 0.19.0 came out. Are you saying there are changes in how task progress is being tracked? In the past, it has usually been when the input is compressed and the code is doing something like uncompressed / total compressed to figure out the done percent. -- Owen
Re: MapReduce jobs with expensive initialization
On Mar 2, 2009, at 3:03 AM, Tom White wrote: I believe the static singleton approach outlined by Scott will work since the map classes are in a single classloader (but I haven't actually tried this). Even easier, you should just be able to do it with static initialization in the Mapper class. (I haven't tried it either... ) -- Owen
Re: Shuffle phase
On Feb 26, 2009, at 2:03 PM, Nathan Marz wrote: Do the reducers batch copy map outputs from a machine? That is, if a machine M has 15 intermediate map outputs destined for machine R, will machine R copy the intermediate outputs one at a time or all at once? Currently, one at a time. In 0.21 it will be batched up. -- Owen
Re: Batching key/value pairs to map
On Mon, Feb 23, 2009 at 12:06 PM, Jimmy Wan ji...@indeed.com wrote: part of my map/reduce process could be greatly sped up by mapping key/value pairs in batches instead of mapping them one by one. Can I safely hang onto my OutputCollector and Reporter from calls to map? Yes. You can even use them in the close, so that you can process the last batch of records. *smile* One problem that you will quickly hit is that Hadoop reuses the objects that are passed to map and reduce. So, you'll need to clone them before putting them into the collection. I'm currently running Hadoop 0.17.2.1. Is this something I could do in Hadoop 0.19.X? I don't think any of this changed between 0.17 and 0.19, other than in 0.17 the reduce's inputs were always new objects. In 0.18 and after, the reduce's inputs are reused. -- Owen
Re: Batching key/value pairs to map
On Feb 23, 2009, at 2:19 PM, Jimmy Wan wrote: I'm not sure if this is possible, but it would certainly be nice to either: 1) pass the OutputCollector and Reporter to the close() method. 2) Provide accessors to the OutputCollector and the Reporter. If you look at the 0.20 branch, which hasn't released yet, there is a new map/reduce api. That api does provide a lot more control. Take a look at Mapper, which provide setup, map, and cleanup hooks: http://tinyurl.com/bquvxq The map method looks like: /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ @SuppressWarnings(unchecked) protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } But there is also a run method that drives the task. The default is given below, but it can be overridden by the application. /** * Expert users can override this method for more complete control over the * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); } Clearly, in your application you could override run to make a list of 100 key, value pairs or something. -- Owen
Re: Is there a way to tell whether you're in a map task or a reduce task?
On Feb 10, 2009, at 5:20 PM, Matei Zaharia wrote: I'd like to write a combiner that shares a lot of code with a reducer, except that the reducer updates an external database at the end. The right way to do this is to either do the update in the output format or do something like: class MyCombiner implements Reducer { ... public void close() throws IOException {} } class MyReduer extends MyCombiner { ... public void close() throws IOException { ... update database ... } } As far as I can tell, since both combiners and reducers must implement the Reducer interface, there is no way to have this be the same class. There are ways to do it, but they are likely to change. Is there a recommended way to test inside the task whether you're running as a combiner (in a map task) or as a reducer? The question is worse than you think. In particular, the question is *not* are you in a map or reduce task. With current versions of Hadoop, the combiner can be called in the context of the reduce as well as the map. You really want to know if you are in a Reducer or Combiner context. If not, I think this might be an interesting thing to support in the Hadoop 1.0 API. It probably does make sense to add to ReduceContext.isCombiner() to answer the question. In practice, usually if someone wants to use *almost* the same code for combiner and reducer, I get suspicious of their design. It would enable people to write an AbstractJob class where you just implement map, combine and reduce functions, and can thus write MapReduce jobs in a single Java class. The old api allowed this, since both Mapper and Reducer were interfaces. The new api doesn't because they are both classes. It wouldn't be hard to make a set of adaptors in library code that would work. Basically, you would define a job with SimpleMapper, SimpleCombiner, and SimpleReducer that would call Task.map, Task.combine, and Task.reduce. -- Owen
Re: only one reducer running in a hadoop cluster
On Feb 7, 2009, at 11:52 PM, Nick Cen wrote: Hi, I hava a hadoop cluster with 4 pc. And I wanna to integrate hadoop and lucene together, so i copy some of the source code from nutch's Indexer class, but when i run my job, i found that there is only 1 reducer running on 1 pc, so the performance is not as far as expect. Set mapred.reduce.tasks in your configuration to the number of reduces, you want your jobs to have by default. Typically this should be 0.99 * mapred.tasktracker.reduce.tasks.maximum * number of computers.
Re: can't read the SequenceFile correctly
On Feb 6, 2009, at 8:52 AM, Bhupesh Bansal wrote: Hey Tom, I got also burned by this ?? Why does BytesWritable.getBytes() returns non-vaild bytes ?? Or we should add a BytesWritable.getValidBytes() kind of function. It does it because continually resizing the array to the valid length is very expensive. It would be a reasonable patch to add a getValidBytes, but most methods in Java's libraries are aware of this and let you pass in byte[], offset, and length. So once you realize what the problem is, you can work around it. -- Owen
Re: TaskTrackers being double counted after restart job recovery
There is a bug that when we restart the TaskTrackers they get counted twice. The problem is the name is generated from the hostname and port number. When TaskTrackers restart they get a new port number and get counted again. The problem goes away when the old TaskTrackers time out in 10 minutes or you restart the JobTracker. -- Owen
Re: Set the Order of the Keys in Reduce
On Jan 22, 2009, at 7:25 AM, Brian MacKay wrote: Is there a way to set the order of the keys in reduce as shown below, no matter what order the collection in MAP occurs in. The keys to reduce are *always* sorted. If the default order is not correct, you can change the compare function. As Tom points out, the critical thing is making sure that all of the keys that you need to group together go to the same reduce. So let's make it a little more concrete and say that you have: public class TextPair implements Writable { public TextPair() {} public void set(String left, String right); public String getLeft(); ... } And your map 0 does: key.set(CAT, B); output.collect(key, value); key.set(DOG, A); output.collect(key, value); While map 1 does: key.set(CAT, A); output.collect(key, value); key.set(DOG,B); output.collect(key,value); And you want to make sure that all of the cats go to the same reduces and that the dogs go to the same reduce, you would need to set the partitioner. It would look like: public class MyPartitionerV implements PartitionerTextPair, V { public void configure(JobConf job) {} public int getPartition(TextPair key, V value, int numReduceTasks) { return (key.getLeft().hashCode() Integer.MAX_VALUE) % numReduceTasks; } } Then define a raw comparator that sorts based on both the left and right part of the TextPair, and you are set. -- Owen
Re: Maven repo for Hadoop
On Jan 17, 2009, at 5:53 PM, Chanwit Kaewkasi wrote: I would like to integrate Hadoop to my project using Ivy. Is there any maven repository containing Hadoop jars that I can point my configuration to? Not yet, but soon. We recently introduced ivy into Hadoop, so I believe we'll upload the pom and jar for 0.20.0 when it is released. -- Owen
Re: Merging reducer outputs into a single part-00000 file
On Jan 14, 2009, at 12:46 AM, Rasit OZDAS wrote: Jim, As far as I know, there is no operation done after Reducer. Correct, other than output promotion, which moves the output file to the final filename. But if you are a little experienced, you already know these. Ordered list means one final file, or am I missing something? There is no value and a lot of cost associated with creating a single file for the output. The question is how you want the keys divided between the reduces (and therefore output files). The default partitioner hashes the key and mods by the number of reduces, which stripes the keys across the output files. You can use the mapred.lib.InputSampler to generate good partition keys and mapred.lib.TotalOrderPartitioner to get completely sorted output based on the partition keys. With the total order partitioner, each reduce gets an increasing range of keys and thus has all of the nice properties of a single file without the costs. -- Owen
Re: General questions about Map-Reduce
On Jan 11, 2009, at 5:50 AM, tienduc_dinh wrote: - Does Map-Reduce support parallel writing/reading ? Yes. The maps all read in parallel with each other and the reduces all write in parallel with each other. - What happens after the Map-Reduce operation ? The OutputFormat usually writes the output to HDFS. -- Owen
Re: General questions about Map-Reduce
On Jan 11, 2009, at 7:05 PM, tienduc_dinh wrote: Is there any article which describes it ? Please read the map/reduce tutorial: http://hadoop.apache.org/core/docs/current/mapred_tutorial.html -- Owen
Re: OutputCollector to print only keys
On Jan 8, 2009, at 4:13 AM, Sandhya E wrote: Does Hadoop 0.18 have an outputcollector that can print out only the keys and supress the values. In our case we need only the keys, and hence we do output.collect(key, blank) and then reprocess the entire output file to remove trailing tabs. If you are using TextOutputFormat, you can either emit NullWritables or null. So do: output.collect(key, null); and it should work fine. -- Owen
Re: OutputCollector to print only keys
On Jan 8, 2009, at 4:13 AM, Sandhya E wrote: Does Hadoop 0.18 have an outputcollector that can print out only the keys and supress the values. In our case we need only the keys, and hence we do output.collect(key, blank) and then reprocess the entire output file to remove trailing tabs. If you are using TextOutputFormat, you can either emit NullWritables or null. So do: output.collect(key, null); and it should work fine. -- Owen
Re: correct pattern for using setOutputValueGroupingComparator?
This is exactly what the setOutputValueGroupingComparator is for. Take a look at HADOOP-4545, for an example using the secondary sort. If you are using trunk or 0.20, look at src/examples/org/apache/hadoop/ examples/SecondarySort.java. The checked in example uses the new map/ reduce api that was introduced in 0.20. -- Owen
Re: Map input records(on JobTracker website) increasing and decreasing
Yeah, I've seen this too. I just filed HADOOP-4983. -- Owen
Re: Output.collect uses toString for custom key class. Is it possible to change this?
On Dec 16, 2008, at 9:30 AM, David Coe wrote: Since the SequenceFileOutputFormat doesn't like nulls, how would I use NullWritable? Obviously output.collect(key, null) isn't working. If I change it to output.collect(key, new IntWritable()) I get the result I want (plus an int that I don't), but output.collect(key, new NullWritable()) does not work. Sorry, I answered you literally. You can write a SequenceFile with NullWritables as the values, but you really want optional nulls. I'd probably define a Wrapper class like GenericWritable. It would look something like: class NullableWriableT extends Writable implements Writable { private T instance; private boolean isNull; public void setNull(boolean isNull) { this.isNull = isNull; } public void readFields(DataInput in) throws IOException { read isNull; if (!isNull) { instance.readFields(in); } public void write(DataOutput out) throws IOException { write isNull; if (!isNull) { instance.write(out); } } } -- Owen
Re: Output.collect uses toString for custom key class. Is it possible to change this?
On Dec 16, 2008, at 8:28 AM, David Coe wrote: Is there a way to output the write() instead? Use SequenceFileOutputFormat. It writes binary files using the write. The reverse is SequenceFileInputFormat, which reads the sequence files using readFields. -- Owen
Re: [video] visualization of the hadoop code history
On Dec 16, 2008, at 12:36 AM, Stefan Groschupf wrote: It is a neat way of visualizing who is behind the Hadoop source code and how the project code base grew over the years. It is interesting, but it would be more interesting to track the authors of the patch rather than the committer. The two are rarely the same. I've got some scripts for parsing the CHANGES.txt that pull all of that apart for each jira. I really should figure out a good place to check those in. *smile* -- Owen
Re: Output.collect uses toString for custom key class. Is it possible to change this?
On Dec 16, 2008, at 9:14 AM, David Coe wrote: Does the SequenceFileOutputFormat work with NullWritable as the value? Yes.
Re: concurrent modification of input files
On Dec 15, 2008, at 8:08 AM, Sandhya E wrote: I have a scenario where, while a map/reduce is working on a file, the input file may get deleted and copied with a new version of the file. All my files are compressed and hence each file is worked on by a single node. I tried simulating the scenario of deleting the file before mapreduce was over, and map/reduce went ahead without complaining. Can I assume this will be the case always. No, the results will be completely non-deterministic. Don't do this. That said, the thing that will save you in micro-tests of this is that if the file is missing at some point, the task will fail and retry. -- Owen
Re: Simple data transformations in Hadoop?
On Dec 13, 2008, at 6:32 PM, Stuart White wrote: First question: would Hadoop be an appropriate tool for something like this? Very What is the best way to model this type of work in Hadoop? As a map-only job with number of reduces = 0. I'm thinking my mappers will accept a Long key that represents the byte offset into the input file, and a Text value that represents the line in the file. Sure, just use TextInputFormat. You'll want to set the minimum split size (mapred.min.split.size) to a large number so that you get exactly 1 map per an input file. I *could* simply uppercase the text lines and write them to an output file directly in the mapper (and not use any reducers). So, there's a question: is it considered bad practice to write output files directly from mappers? You could do it directly, but I would suggest that using the TextOutputFormat is easier. Your map should just do: collect(null, upperCaseLine); Assuming that number of reduces is 0, the output of the map goes straight to the OutputCollector. Assuming it's advisable in this example to write a file directly in the mapper - how should the mapper create a unique output partition file name? Is there a way for a mapper to know its index in the total # of mappers? Get mapred.task.partition from the configuration. Assuming it's inadvisable to write a file directly in the mapper - I can output the records to the reducers using the same key and using the uppercased data as the value. Then, in my reducer, should I write a file? Or should I collect() the records in the reducers and let hadoop write the output? See above, but with no reduces the data is not sorted. If you pass a null or NullWritable to the TextOutputFormat, it will not add the tab. -- Owen
Re: Suggestions of proper usage of key parameter ?
On Dec 14, 2008, at 4:47 PM, Ricky Ho wrote: Yes, I am referring to the key INPUT INTO the map() function and the key EMITTED FROM the reduce() function. Can someone explain why do we need a key in these cases and what is the proper use of it ? It was a design choice and could have been done as: R1 - map - K,V - reduce - R2 instead of K1,V1 - map - K2,V2 - reduce - K3,V3 but since the input of the reduce is sorted on K2, the output of the reduce is also typically sorted and therefore keyed. Since jobs are often chained together, it makes sense to make the reduce input match the map input. Of course everything you could do with the first option is possible with the second using either K1 = R1 or V1 = R1. Having the keys is often convenient... Who determines what the key should be ? (by the corresponding InputFormat implementation class) ? The InputFormat makes the choice. In this case, what is the key in the map() call ? (name of the input file) ? TextInputFormat uses the byte offset as the key and the line as the value. What if the reduce() function emits multiple key, value entries or not emitting any entry at all ? Is this considered OK ? Yes. What if the reduce() function emits a key, value entry whose key is not the same as the input key parameter to the reduce() function ? Is this OK ? Yes, although the reduce output is not re-sorted, so the results won't be sorted unless K3 is a subset of K2. If there is a two Map/Reduce cycle chained together. Is the key input into the 2nd round map() function determined by the key emitted from the 1st round reduce() function ? Yes. -- Owen
Re: Reset hadoop servers
On Dec 9, 2008, at 2:22 AM, Devaraj Das wrote: I know that the tasktracker/jobtracker doesn't have any command for re-reading the configuration. There is built-in support for restart/shut-down but those are via external scripts that internally do a kill/start. Actually, HADOOP-4348 does add a refresh command to map/reduce. I'm not sure if it re-reads the entire config or just part of it. -- Owen
Re: End of block/file for Map
On Dec 9, 2008, at 11:35 AM, Songting Chen wrote: Is there a way for the Map process to know it's the end of records? I need to flush some additional data at the end of the Map process, but wondering where I should put that code. The close() method is called at the end of the map. -- Owen
Re: End of block/file for Map
On Dec 9, 2008, at 7:34 PM, Aaron Kimball wrote: That's true, but you should be aware that you no longer have an OutputCollector available in the close() method. True, but in practice you can keep a handle to it from the map method and it will work perfectly. This is required for both streaming and pipes to work. (Both of them do their processing asynchronously, so the close needs to wait for the subprocess to finish. Because of this, the contract with the Mapper and Reducer are very loose and the collect method may be called in between calls to the map method.) In the context object api (hadoop-1230), the api will include the context object in cleanup, to make it clear that cleanup can also write records. -- Owen
Re: How are records with equal key sorted in hadoop-0.18?
On Dec 8, 2008, at 8:02 AM, Christian Kunz wrote: Comparing hadoop-default.xml of hadoop-0.18 with hadoop-0.17, didn't map.sort.class change from org.apache.hadoop.mapred.MergeSorter to org.apache.hadoop.util.QuickSort? Yes, but the quick sort is only used in the mapper. The reducer already has sorted runs and therefore only needs an external merge sort. The primary change in 0.18 for the reducer was HADOOP-2095. What were the values of io.sort.factor and io.file.buffer.size? Christian, can you get the heap profile for one of the reduces that is failing? mapred.task.profile=true mapred.task.profile.maps= empty string, since we don't want any maps mapred.task.profile.reduces= number of reduce to profile mapred.task.profile.params=- agentlib:hprof=heap=sites,force=n,thread=y,verbose=n,file=%s -- Owen
Re: getting Configuration object in mapper
On Dec 4, 2008, at 9:19 PM, abhinit wrote: I have set some variable using the JobConf object. jobConf.set(Operator, operator) etc. How can I get an instance of Configuration object/ JobConf object inside a map method so that I can retrieve these variables. In your Mapper class, implement a method like: public void configure(JobConf job) { ... } This will be called when the object is created with the job conf. -- Owen
Re: Hadoop Internal Architecture writeup
On Nov 28, 2008, at 9:45 AM, Ricky Ho wrote: [Ricky] What exactly does the job.split contains ? I assume it contains the specification for each split (but not its data), such as what is the corresponding file and the byte range within that file. Correct ? Yes [Ricky] I am curious about why can't the reduce execution start earlier (before all the map tasks completed). The contract is that each reduce is given the keys in sorted order. The reduce can't start until it is sure it has the first key. That can only happen after the maps are all finished. [Ricky] Do you mean if the job has 5000 splits, then it requires 5000 TaskTrackers VM (one for each split) ? In 0.19, you can enable the framework to re-use jvms between tasks in the same job. Look at HADOOP-249. [Ricky] Is this a well-know folder within the HDFS ? It is configured by the cluster admin. However, applications should *not* depend on the contents or even visibility of that directory. It will almost certainly become inaccessible to clients as part of increasing security. -- Owen
Re: How to let Reducer know on which partition it is working
On Nov 26, 2008, at 4:35 AM, Jürgen Broß wrote: I'm not sure how to let a Reducer know in its configure() method which partition it will get from the Partitioner, From: http://hadoop.apache.org/core/docs/r0.19.0/mapred_tutorial.html#Task+JVM+Reuse look for mapred.task.partition, which is a number from 0 to # reduces - 1. -- Owen
Re: Block placement in HDFS
On Nov 24, 2008, at 8:44 PM, Mahadev Konar wrote: Hi Dennis, I don't think that is possible to do. No, it is not possible. The block placement is determined by HDFS internally (which is local, rack local and off rack). Actually, it was changed in 0.17 or so to be node-local, off-rack, and a second node off rack. -- Owen
Re: A question about the combiner, reducer and the Output value class: can they be different?
On Sun, Nov 16, 2008 at 2:18 PM, Saptarshi Guha [EMAIL PROTECTED]wrote: Hello, If my understanding is correct, the combiner will read in values for a given key, process it, output it and then **all** values for a key are given to the reducer. Not quite. The flow looks like RecordReader - Mapper - Combiner * - Reducer - OutputFormat . The Combiner may be called 0, 1, or many times on each key between the mapper and reducer. Combiners are just an application specific optimization that compress the intermediate output. They should not have side effects or transform the types. Unfortunately, since there isn't a separate interface for Combiners, there is isn't a great place to document this requirement. I've just filed HADOOP-4668 to improve the documentation. Then it ought to be possible for the combiner to be of the form ... ReducerIntWritable, Text, IntWritable, BytesWritable and the reducer: ...ReducerIntWritable, BytesWritable, IntWritable, Text Since the combiner may be called an arbitrary number of times, it must have the same input and output types. So the parts generically look like: input: InputFormatK1,V1 mapper: MapperK1,V1,K2,V2 combiner: ReducerK2,V2,K2,V2 reducer: ReducerK2,V2,K3,V3 output: RecordWriterK3,V3 so you probably need to move the code that was changing the type into the last setp of the mapper. -- Owen
Re: Writing to multiple output channels
On Nov 13, 2008, at 9:38 PM, Sunil Jagadish wrote: I have a mapper which needs to write output into two different kinds of files (output.collect()). For my purpose, I do not need any reducers. Set the number of reduces to 0. Open a sequence file in the mapper and write the second stream to it. You'll end up with two files per a mapper. -- Owen
Re: reduce more than one way
On Nov 7, 2008, at 12:35 PM, Elia Mazzawi wrote: I have 2 hadooop map/reduce programs that have the same map, but a different reduce methods. can i run them in a way so that the map only happens once? If the input to the reduces is the same, you can put the two reduces together and use one of the multiple output libraries. That will let your reducer produce two different output directories. -- Owen
Re: Hadoop Design Question
On Nov 6, 2008, at 11:29 AM, Ricky Ho wrote: Disk I/O overhead == - The output of a Map task is written to a local disk and then later on upload to the Reduce task. While this enable a simple recovery strategy when the map task failed, it incur additional disk I/O overhead. That is correct. However, Linux does very well at using extra ram for buffer caches, so as long as you enable write behind it won't be a performance problem. You are right that the primary motivation is both recoverability and not needing the reduces running until after maps finish. So I am wondering if there is an option to bypassing the step of writing the map result to the local disk. Currently no. - In the current setting, it sounds like no reduce task will be started before all map tasks have completed. In case if there are a few slow running map tasks, the whole job will be delayed. The application's reduce function can't start until the last map finishes because the input to the reduce is sorted. Since the last map may generate the first keys that must be given to the reduce, the reduce must wait. - The overall job execution can be shortened if the reduce tasks can starts its processing as soon as some map results are available rather than waiting for all the map tasks to complete. But it would violate the specification of the framework that the input to reduce is completely sorted. - Therefore it is impossible for the reduce phase of Job1 to stream its output data to a file while the map phase of Job2 start reading the same file. Job2 can only start after ALL REDUCE TASKS of Job1 is completed, which makes pipelining between jobs impossible. It is currently not supported, but the framework could be extended to let the client add input splits after the job has started. That would remove the hard synchronization between jobs. - This means the partitioning function has to be chosen carefully to make sure the workload of the reduce processes is balanced. (maybe not a big deal) Yes, the partitioner must balance the workload between the reduces. - Is there any thoughts of running a pool of reduce tasks on the same key and have they combine their results later ? That is called the combiner. It is called multiple times as the data is merged together. See the word count example. If the reducer does data reduction, using combiners is very important for performance. -- Owen
Re: More Hadoop Design Question
On Nov 6, 2008, at 2:30 PM, Ricky Ho wrote: Hmmm, sounds like the combiner is invoked after the map() process completed for the file split. No. The data path is complex, but the combiner is called when the map outputs are being spilled to disk. So roughly, the map will output key, value pairs until the io.sort.mb buffer is full, the contents are sorted and fed to the combiner. The output of the combiner is written to disk. When there are enough spills on disk, it will merge them together, call the combiner, and write to disk. When the map finishes, the final multi-level merge is done. Since the reduce is also doing multi-level sort, it will also call the combiner when a merge is done (other than the final merge, which is fed into the reduce). That means, before the combiner function starts, all the intermediate map() output result will be kept in memory ? Any comment on the memory footprint consumption ? The memory is bound by io.sort.mb. I think a sufficient condition is just to make sure the reduce task will not COMPLETE before all the map tasks has completed. We don't need to make sure the reduce task will not START before all maps tasks has completed. This can be achieved easily by letting the iterator.next() call within the reduce() method blocked. *Sigh* no. The reduce function is invoked once per a unique key. The reduce function is called in ascending order of keys. Since the final map may return a's when previously you've only seen b's and c's. You can't call the reduce with the b, you can't later call it with the a. There is another potential issue in the reduce() API, can you explain why do we need to expose the OutputCollector to the reduce() method ? For example, is it possible that the key in the output.collect() be a different key from the reduce method parameter ? What happen if two reduce method (start with different keys) writing their output on the same key ? The reduce is allowed to have different input and output types. There are *four* type parameters. ReducerKeyIn, ValueIn, KeyOut, ValueOut The output of the reduce is not resorted. If the reduce doesn't use the same key as the input, the output of the reduce won't be sorted. Duplicate keys on reduce output (either within the same reduce or different ones, is not a problem for the framework.) However, this requires some change of the current Reducer interface. Currently the reduce() method is called once per key. We want that to be called once per map result (within the same key). What I mean is the following interface ... There is a library that lets you run a chain of maps, if that is the semantics you are looking for. For map/reduce, the sort is a very fundamental piece. If you don't need sort between map and reduce, you can set reduces = 0 and run much faster. Does it make sense ? Not really. Most map/reduce applications need the other semantics. -- Owen
Re: key,Values at Reducer are local or present in DFS
On Nov 5, 2008, at 2:21 PM, Tarandeep Singh wrote: I want to know whether the key,values received by a particular reducer at a node are stored locally on that node or are stored on DFS (and hence replicated over cluster according to replication factor set by user) Map outputs (and reduce inputs) are stored on local disk and not HDFS. The data is moved between computers via http. One more question- How does framework replicates the data? Say Node A writes a file, is it guaranteed that atleast one copy will be stored on node A? HDFS writes all of the replicas in parallel. For the most part, it writes to the local (same node) DataNode first, that DataNode sends it to a DataNode on another rack, and that DataNode sends it to a third DataNode on the other rack. -- Owen
Re: Any Way to Skip Mapping?
If you don't need a sort, which is what it sounds like, Hadoop supports that by turning off the reduce. That is done by setting the number of reduces to 0. This typically is much faster than if you need the sort. It also sounds like you may need/want the library that does map-side joins. http://tinyurl.com/43j5pp -- Owen
ApacheCon US 2008
Just a reminder that ApacheCon US is next week in New Orleans. There will be a lot of Hadoop developers and talks. (I'm CC'ing core-user because it has the widest coverage. Please join the low traffic [EMAIL PROTECTED] list for cross sub-project announcements.) * Hadoop Camp with lots of talks about Hadoop o Introduction to Hadoop by Owen O'Malley o A Tour of Apache Hadoop by Tom White o Programming Hadoop Map/Reduce by Arun Murthy o Hadoop at Yahoo! by Eric Baldeschwieler o Hadoop Futures Panel o Using Hadoop for an Intranet Seach Engine by Shivakumar Vaithyanthan o Cloud Computing Testbed by Thomas Sandholm o Improving Virtualization and Performance Tracing of Hadoop with Open Solaris by George Porter o An Insight into Hadoop Usage at Facebook by Dhruba Borthakur o Pig by Alan Gates o Zookeeper, Coordinating the Distributed Application by Ben Reed o Querying JSON Data on Hadoop using Jaql by Kevin Beyer o HBase by Michael Stack * Hadoop training on Practical Problem Solving in Hadoop * Cloudera is providing a test Hadoop cluster and a Hadoop hacking contest. There is also a new Hadoop tutorial available. -- Owen
Re: Mapper settings...
On Oct 31, 2008, at 3:15 PM, Bhupesh Bansal wrote: Why do we need these setters in JobConf ?? jobConf.setMapOutputKeyClass(String.class); jobConf.setMapOutputValueClass(LongWritable.class); Just historical. The Mapper and Reducer interfaces didn't use to be generic. (Hadoop used to run on Java 1.4 too...) It would be nice to remove the need to call them. There is an old bug open to check for consistency HADOOP-1683. It would be even better to make the setting of both the map and reduce output types optional if they are specified by the template parameters. -- Owen
Re: Is there a unique ID associated with each task?
On Oct 30, 2008, at 9:03 AM, Joel Welling wrote: I'm writing a Hadoop Pipes application, and I need to generate a bunch of integers that are unique across all map tasks. If each map task has a unique integer ID, I can make sure my integers are unique by including that integer ID. I have this theory that each map task has a unique identifier associated with some configuration parameter, but I don't know the name of that parameter. Is there an integer associated with each task? If so, how do I get it? While we're at it, is there a way to get the total number of map tasks? There is a unique identifier for each task and even each task attempt. From: http://hadoop.apache.org/core/docs/current/mapred_tutorial.html#Task+Execution+%26+Environment mapred.tip.idThe task id mapred.task.id The task attempt id Since you probably don't care about re-execution, you probably want mapred.tip.id. If you just want the number of the mapred.tip.id as an integer, you can use mapred.task.partition, which will be a number from 0 to M - 1 for maps or 0 to R -1 for reduces. -- Owen
Re: I am attempting to use setOutputValueGroupingComparator as a secondary sort on the values
I uploaded a patch that does a secondary sort. Take a look at: https://issues.apache.org/jira/browse/HADOOP-4545 It reads input with two numbers per a line. Such as: -1 -4 -3 23 5 10 -1 -2 -1 300 -1 10 4 1 4 2 4 10 4 -1 4 -10 10 20 10 30 10 25 And produces output like (with 2 reduces): part-0: 4 -10 4 -1 4 1 4 2 4 10 10 20 10 25 10 30 part-1: -3 23 -1 -4 -1 -2 -1 10 -1 300 5 10 -- Owen
Re: Understanding file splits
On Oct 28, 2008, at 6:29 AM, Malcolm Matalka wrote: I am trying to write an InputFormat and I am having some trouble understanding how my data is being broken up. My input is a previous hadoop job and I have added code to my record reader to print out the FileSplit's start and end position, as well as where the last record I read was located. My record are all about 100 bytes so fairly small. For one file I am seeing the following output: start: 0 end: 45101881 pos: 67108800 start: 45101880 end: 90203762 pos: 67108810 start: 90203761 end: 135305643 pos: 134217621 start: 135305642 end: 180170980 pos: 180170902 It would help if you printed the FIleSplits themselves, so that we can see the file names. I don't know where the pos is coming from. FileSplits only have offset and length. Note, I have also specified in my InputFormat that isSplittable return false. That isn't working. Otherwise, you would only have FileSplits that start at 0. I do not understand why there is overlap. Note that on the second one, I never appear to reach the end position. The RecordReaders have to read more than their split because they can only process whole records. So, in the case of text files and TextInputFormat, the split is picked blindly. Then all of the splits that don't start at offset 0, read until they reach a newline. They start from there and read until the newline *past* the end of the split. That way all of the data is processed and no partial records are processed. -- Owen
Re: I am attempting to use setOutputValueGroupingComparator as a secondary sort on the values
On Oct 28, 2008, at 7:53 AM, David M. Coe wrote: My mapper is MapperLongWritable, Text, IntWritable, IntWritable and my reducer is the identity. I configure the program using: conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setReducerClass(IdentityReducer.class); conf.setOutputKeyComparatorClass(IntWritable.Comparator.class); conf.setOutputValueGroupingComparator(IntWritable.Comparator.class); The problem is that your map needs to look like: class IntPair implements Writable { private int left; private int right; public void set(int left, int right) { ... } public int getLeft() {...} public int getRight() {...} } your Mapper should be MapperLongWritable, Text, IntPair, IntWritable and should emit IntPair key = new IntPair(); IntegerWritable value = new IntegerWritale(); ... key.set(keyValue, valueValue); value.set(keyValue,); output.collect(key, value); Your sort comparator should take compare both left and right in the pair. The grouping comparator should only look at left in the pair. Your Reducer should be ReducerIntPair, IntWritable, IntWritable, IntWritable output.collect(key.getLeft(), value); Is that clearer? -- Owen
Re: help: InputFormat problem ?
If your application that you are drawing from is doing some sort of web crawl that connects to lots of random servers, you may want to use MultiThreadedMapRunner and do the remote connections in the map. If you are just connecting to a small set of servers for each map, you should put it in the InputFormat. Using MultiThreadedMapRunner means that each map can have multiple threads transferring data from the external sources. -- Owen
Re: Help: How to change number of mappers in Hadoop streaming?
On Oct 26, 2008, at 8:38 AM, chaitanya krishna wrote: I forgot to mention that although the number of map tasks are set in the code as I mentioned before, the actual number of map tasks are not essentially the same number but is very close to this number. The number of reduces is precisely the one configured by the job. The number of maps depends on the InputFormat selected. For FileInputFormats, which include TextInputFormat and SequenceFileInputFormat, the formula is complicated, but it usually defaults to the greater of the number requested or the number of hdfs blocks in the input. -- Owen
Re: Any one successfully ran the c++ pipes example?
On Oct 16, 2008, at 1:40 PM, Zhengguo 'Mike' SUN wrote: I was trying to write an application using the pipes api. But it seemed the serialization part is not working correctly. More specifically, I can't deserialize a string from an StringInStream constructed from context.getInputSplit(). Even with the examples bundled in the distribution archive(wordcount-nopipe.cc), it threw exceptions. If anyone had experience on that, please kindly give some advise. So you mean the example with a C++ record reader? You have to use the InputFormat that generates input splits that consists of strings. Look at src/test/org/apache/hadoop/pipes/WordCountInputFormat.java It would be useful to have a C++ impl of FileInputSplit too... -- Owen
Re: Distributed cache Design
On Oct 16, 2008, at 1:52 PM, Bhupesh Bansal wrote: We at Linkedin are trying to run some Large Graph Analysis problems on Hadoop. The fastest way to run would be to keep a copy of whole Graph in RAM at all mappers. (Graph size is about 8G in RAM) we have cluster of 8- cores machine with 8G on each. The best way to deal with it is *not* to load the entire graph in one process. In the WebMap at Yahoo, we have a graph of the web that has roughly 1 trillion links and 100 billion nodes. See http://tinyurl.com/4fgok6 . To invert the links, you process the graph in pieces and resort based on the target. You'll get much better performance and scale to almost any size. Whats is the best way of doing that ?? Is there a way so that multiple mappers on same machine can access a RAM cache ?? I read about hadoop distributed cache looks like it's copies the file (hdfs / http) locally on the slaves but not necessrily in RAM ?? You could mmap the file from distributed cache using MappedByteBuffer. Then there will be one copy between jvms... -- Owen
Re: Distributed cache Design
On Oct 16, 2008, at 3:09 PM, Bhupesh Bansal wrote: Lets say I want to implement a DFS in my graph. I am not able to picturise implementing it with doing graph in pieces without putting a depth bound to (3-4). Lets say we have 200M (4GB) edges to start with Start by watching the lecture on graph algorithms in map/reduce: http://www.youtube.com/watch?v=BT-piFBP4fE And see if that makes it clearer. If not, ask more questions. *smile* -- Owen
Re: Seeking examples beyond word count
On Oct 14, 2008, at 8:37 PM, Bert Schmidt wrote: I'm trying to think of how I might use it yet all the examples I find are variations of word count. Look in the src/examples directory. PiEstimator - estimates the value of pi using distributed brute force Pentomino - solves Pentomino tile placement problems including one sided variants Terasort - tools to generate the required data, sort it into a total order, and verify the sort order There is also distcp in src/tools that uses map/reduce to copy a lot of files between clusters. Are there any interesting examples of how people are using it for real tasks? A final pointer would be to Nutch, that uses Hadoop for distribution. -- Owen
Re: Sharing an object across mappers
On Oct 3, 2008, at 7:49 AM, Devajyoti Sarkar wrote: Briefly going through the DistributedCache information, it seems to be a way to distribute files to mappers/reducers. Sure, but it handles the distribution problem for you. One still needs to read the contents into each map/reduce task VM. If the data is straight binary data, you could just mmap it from the various tasks. It would be pretty efficient. The other direction is to use the MultiThreadedMapRunner and run multiple maps as threads in the same VM. But unless your maps are CPU heavy or contacting external servers, it probably won't help as much as you'd like. -- Owen