Re: Multicluster Communication
On Fri, Jun 19, 2009 at 4:06 PM, Rakhi Khatwani rakhi.khatw...@gmail.comwrote: we want hadoop cluster 1 for collecting data n storing it in HDFS we want hadoop cluster 2 for using the stored data from HDFS and analysing it. Why do you want to do this in the first place? It seems like you want cluster1 to be a plain HDFS cluster and cluster2 to be a mapred cluster. Doing something like that will be disastrous - Hadoop is all about sending computation closer to your data. If you don't want that, you need not even use hadoop. -- Harish Mallipeddi http://blog.poundbang.in
Re: Multicluster Communication
Hi Harish, I want both of them 2 b compute clusters. but yea... how wud they hv a common storage area we basically want to seperate the collection from analysis. is it possible to dedicate a set of nodes in the hadoop cluster only for collections and another set of nodes in the same cluster only for analysis? Regards Raakhi On Fri, Jun 19, 2009 at 4:19 PM, Harish Mallipeddi harish.mallipe...@gmail.com wrote: On Fri, Jun 19, 2009 at 4:06 PM, Rakhi Khatwani rakhi.khatw...@gmail.com wrote: we want hadoop cluster 1 for collecting data n storing it in HDFS we want hadoop cluster 2 for using the stored data from HDFS and analysing it. Why do you want to do this in the first place? It seems like you want cluster1 to be a plain HDFS cluster and cluster2 to be a mapred cluster. Doing something like that will be disastrous - Hadoop is all about sending computation closer to your data. If you don't want that, you need not even use hadoop. -- Harish Mallipeddi http://blog.poundbang.in
sleep 60 between start-dfs.sh and putting files. Is it normal?
Hello. How i can ensure that cluster is up? Now i using sleep 60 between start-dfs.sh and putting files to input... Thanks.
Re: sleep 60 between start-dfs.sh and putting files. Is it normal?
Hi Pavel, You should use hadoop dfsadmin -safemode wait after starting your cluster. This will wait for the namenode to exit safe mode so you can begin making modifications. -Todd On Fri, Jun 19, 2009 at 9:03 AM, pavel kolodin pavelkolo...@gmail.comwrote: Hello. How i can ensure that cluster is up? Now i using sleep 60 between start-dfs.sh and putting files to input... Thanks.
Re: Restrict output of mappers to reducers running on same node?
Yes, you are correct. I had not thought about sharing a file handle through multiple tasks via jvm reuse. On Thu, Jun 18, 2009 at 9:43 AM, Tarandeep Singh tarand...@gmail.comwrote: Jason, correct me if I am wrong- opening Sequence file in the configure (or setup method in 0.20) and writing to it is same as doing output.collect( ), unless you mean I should make the sequence file writer static variable and set reuse jvm flag to -1. In that case the subsequent mappers might be run in the same JVM and they can use the same writer and hence produce one file. But in that case I need to add a hook to close the writer - may be use the shutdown hook. Jothi, the idea of combine input format is good. But I guess I have to write somethign of my own to make it work in my case. Thanks guys for the suggestions... but I feel we should have some support from the framework to merge the output of mapper only job so that we don't get a lot number of smaller files. Sometimes you just don't want to run reducers and unnecessarily transfer a whole lot of data across the network. Thanks, Tarandeep On Wed, Jun 17, 2009 at 7:57 PM, jason hadoop jason.had...@gmail.com wrote: You can open your sequence file in the mapper configure method, write to it in your map, and close it in the mapper close method. Then you end up with 1 sequence file per map. I am making an assumption that each key,value to your map some how represents a single xml file/item. On Wed, Jun 17, 2009 at 7:29 PM, Jothi Padmanabhan joth...@yahoo-inc.com wrote: You could look at CombineFileInputFormat to generate a single split out of several files. Your partitioner would be able to assign keys to specific reducers, but you would not have control on which node a given reduce task will run. Jothi On 6/18/09 5:10 AM, Tarandeep Singh tarand...@gmail.com wrote: Hi, Can I restrict the output of mappers running on a node to go to reducer(s) running on the same node? Let me explain why I want to do this- I am converting huge number of XML files into SequenceFiles. So theoretically I don't even need reducers, mappers would read xml files and output Sequencefiles. But the problem with this approach is I will end up getting huge number of small output files. To avoid generating large number of smaller files, I can Identity reducers. But by running reducers, I am unnecessarily transfering data over network. I ran some test case using a small subset of my data (~90GB). With map only jobs, my cluster finished conversion in only 6 minutes. But with map and Identity reducers job, it takes around 38 minutes. I have to process close to a terabyte of data. So I was thinking of a faster alternatives- * Writing a custom OutputFormat * Somehow restrict output of mappers running on a node to go to reducers running on the same node. May be I can write my own partitioner (simple) but not sure how Hadoop's framework assigns partitions to reduce tasks. Any pointers ? Or this is not possible at all ? Thanks, Tarandeep -- Pro Hadoop, a book to guide you from beginner to hadoop mastery, http://www.amazon.com/dp/1430219424?tag=jewlerymall www.prohadoopbook.com a community for Hadoop Professionals -- Pro Hadoop, a book to guide you from beginner to hadoop mastery, http://www.amazon.com/dp/1430219424?tag=jewlerymall www.prohadoopbook.com a community for Hadoop Professionals
Re: Nor OOM Java Heap Space neither GC OverHead Limit Exeeceded
You can pass the -D mapred.child.java.opts=-Xmx[some value] as part of your job, or set it in your job conf before your task is submitted. THen the per task jvm's will use that string as part of the jvm initialization paramter set The distributed cache is used for making files and archives that are stored in hdfs, available in the local file system working area of your tasks. The GenericOptionsParser class that most Hadoop user interfaces use, provides a couple of command line arguments that allow you to specify local file system files which are copied into hdfs and then made avilable as stated above -files and libjars are the to arguments. My book has a solid discussion and example set for the distributed cache in chapter 5. On Thu, Jun 18, 2009 at 1:45 PM, akhil1988 akhilan...@gmail.com wrote: Hi Jason! I finally found out that there was some problem in reserving the HEAPSIZE which I have resolved now. Actually we cannot change the HADOOP_HEAPSIZE using export from our user account, after we have started the Hadoop. It has to changed by the root. I have a user account on the cluster and I was trying to change the Hadoop_heapsize from my user account using 'export' which had no effect. So I had to request my cluster administrator to increase the HADOOP_HEAPSIZE in hadoop-env.sh and then restart hadoop. Now the program is running absolutely fine. Thanks for your help. One thing that I would like to ask you is that can we use DistributerCache for transferring directories to the local cache of the tasks? Thanks, Akhil akhil1988 wrote: Hi Jason! Thanks for going with me to solve my problem. To restate things and make it more easier to understand: I am working in local mode in the directory which contains the job jar and also the Config and Data directories. I just removed the following three statements from my code: DistributedCache.addCacheFile(new URI(/home/akhil1988/Ner/OriginalNer/Data/), conf); DistributedCache.addCacheFile(new URI(/home/akhil1988/Ner/OriginalNer/Config/), conf); DistributedCache.createSymlink(conf); The program executes till the same point as before now also and terminates. That means the above three statements are of no use while working in local mode. In local mode, the working directory for the mapreduce tasks becomes the current woking direcotry in which you started the hadoop command to execute the job. Since I have removed the DistributedCache.add. statements there should be no issue whether I am giving a file name or a directory name as argument to it. Now it seems to me that there is some problem in reading the binary file using binaryRead. Please let me know if I am going wrong anywhere. Thanks, Akhil jason hadoop wrote: I have only ever used the distributed cache to add files, including binary files such as shared libraries. It looks like you are adding a directory. The DistributedCache is not generally used for passing data, but for passing file names. The files must be stored in a shared file system (hdfs for simplicity) already. The distributed cache makes the names available to the tasks, and the the files are extracted from hdfs and stored in the task local work area on each task tracker node. It looks like you may be storing the contents of your files in the distributed cache. On Wed, Jun 17, 2009 at 6:56 AM, akhil1988 akhilan...@gmail.com wrote: Thanks Jason. I went inside the code of the statement and found out that it eventually makes some binaryRead function call to read a binary file and there it strucks. Do you know whether there is any problem in giving a binary file for addition to the distributed cache. In the statement DistributedCache.addCacheFile(new URI(/home/akhil1988/Ner/OriginalNer/Data/), conf); Data is a directory which contains some text as well as some binary files. In the statement Parameters.readConfigAndLoadExternalData(Config/allLayer1.config); I can see(in the output messages) that it is able to read the text files but it gets struck at the binary files. So, I think here the problem is: it is not able to read the binary files which either have not been transferred to the cache or a binary file cannot be read. Do you know the solution to this? Thanks, Akhil jason hadoop wrote: Something is happening inside of your (Parameters. readConfigAndLoadExternalData(Config/allLayer1.config);) code, and the framework is killing the job for not heartbeating for 600 seconds On Tue, Jun 16, 2009 at 8:32 PM, akhil1988 akhilan...@gmail.com wrote: One more thing, finally it terminates there (after some time) by giving the final Exception: java.io.IOException: Job failed! at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1217) at
Re: Trying to setup Cluster
Divij, In regards to your ssh problem -- 1) make sure that your authorized_keys file contains the public key (not the private key). 2) make sure the permissions on the .ssh directory and the files within it are correct. They should look something like this: dvrya...@ubuntu:~$ ls -la .ssh/ total 24 drwx-- 2 dvryaboy dvryaboy 4096 2009-06-18 09:28 . drwxr-xr-x 58 dvryaboy dvryaboy 4096 2009-06-18 17:18 .. -rw-r--r-- 1 dvryaboy dvryaboy 397 2009-06-18 09:28 authorized_keys -rw--- 1 dvryaboy dvryaboy 1675 2009-06-18 09:27 id_rsa -rw-r--r-- 1 dvryaboy dvryaboy 397 2009-06-18 09:27 id_rsa.pub -rw-r--r-- 1 dvryaboy dvryaboy 1768 2009-06-18 09:31 known_hosts (note that the private key and the directory are restricted to my user). -D On Fri, Jun 19, 2009 at 9:04 AM, Divij Durve divij.t...@gmail.com wrote: Thanks for the info aaron. I think the $HADOOP_HOME does get resolved but i will change it anyway. I have tried all possible methods of getting the passwordless ssh to work even done cat file where generated key is saved authorized keys file It still asks for the pass for ssh localhost. I moved the job tracker to the main node and then the cluster started working i did a data load but when i sent out a query like select count(1) from table name it gave me an error. the query select * from table name worked just fine. I really cant figure out whats going wrong. I sent a mail out with the error a after this mail. Also let me know if there is any added info i need to give to help with a solution. Thanks Divij On Thu, Jun 18, 2009 at 4:32 PM, Aaron Kimball aa...@cloudera.com wrote: Are you encountering specific problems? I don't think that hadoop's config files will evaluate environment variables. So $HADOOP_HOME won't be interpreted correctly. For passwordless ssh, see http://rcsg-gsir.imsb-dsgi.nrc-cnrc.gc.ca/documents/internet/node31.htmlor just check the manpage for ssh-keygen. - Aaron On Wed, Jun 17, 2009 at 9:30 AM, Divij Durve divij.t...@gmail.com wrote: Im trying to setup a cluster with 3 different machines running Fedora. I cant get them to log into the localhost without the password but thats the least of my worries at the moment. I am posting my config files and the master and slave files let me know if anyone can spot a problem with the configs... Hadoop-site.xml ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? !-- Put site-specific property overrides in this file. -- configuration property namedfs.data.dir/name value$HADOOP_HOME/dfs-data/value finaltrue/final /property property namedfs.name.dir/name value$HADOOP_HOME/dfs-name/value finaltrue/final /property property namehadoop.tmp.dir/name value$HADOOP_HOME/hadoop-tmp/value descriptionA base for other temporary directories./description /property property namefs.default.name/name valuehdfs://gobi.something.something:54310/value descriptionThe name of the default file system. A URI whose scheme and authority determine the FileSystem implementation. The uri's scheme determines the config property (fs.SCHEME.impl) naming the FileSystem implementation class. The uri's authority is used to determine the host, port, etc. for a FileSystem./description /property property namemapred.job.tracker/name valuekalahari.something.something:54311/value descriptionThe host and port that the MapReduce job tracker runs at. If local, then jobs are run in-process as a single map and reduce task. /description /property property namemapred.system.dir/name value$HADOOP_HOME/mapred-system/value finaltrue/final /property property namedfs.replication/name value1/value descriptionDefault block replication. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time. /description /property property namemapred.local.dir/name value$HADOOP_HOME/mapred-local/value namedfs.replication/name value1/value /property /configuration Slave: kongur.something.something master: kalahari.something.something i execute the dfs-start.sh command from gobi.something.something. is there any other info that i should provide in order to help? Also Kongur is where im running the data node the master file on kongur should have localhost in it rite? thanks for the help Divij
Re: Announcing CloudBase-1.3.1 release
On Wed, Jun 17, 2009 at 6:33 PM, zsongbo zson...@gmail.com wrote: How about the index of CloudBase? CloudBase has support for Hash Indexing. We have tested it with our production data and found it very useful specially if you want to index on Date column and later want to query on specific dates or date ranges. I have explained indexing in details here- http://www.mail-archive.com/core-user@hadoop.apache.org/msg10708.html In CloudBase-1.3 release, Update index was added so you can keep your index up to date with data additions. Let us know if you want to know more. Thanks, Tarandeep On Wed, Jun 17, 2009 at 4:16 AM, Ru, Yanbo y...@business.com wrote: Hi, We have released 1.3.1 version of CloudBase on sourceforge- https://sourceforge.net/projects/cloudbase CloudBase is a data warehouse system for Terabyte Petabyte scale analytics. It is built on top of Map-Reduce architecture. It allows you to query flat log files using ANSI SQL. Please give it a try and send us your feedback. Thanks, Yanbo Release notes - New Features: * CREATE CSV tables - One can create tables on top of data in CSV (Comma Separated Values) format and query them using SQL. Current implementation doesn't accept CSV records which span multiple lines. Data may not be processed correctly if a field contains embedded line-breaks. Please visit http://cloudbase.sourceforge.net/index.html#userDoc for detailed specification of the CSV format. Bug fixes: * Aggregate function 'AVG' returns the same value as 'SUM' function * If a query has multiple aliases, only the last alias works
Re: A simple performance benchmark for Hadoop, Hive and Pig
On Thu, Jun 18, 2009 at 9:29 PM, Zheng Shao zs...@facebook.com wrote: Yuntao Jia, our intern this summer, did a simple performance benchmark for Hadoop, Hive and Pig based on the queries in the SIGMOD 2009 paper: A Comparison of Approaches to Large-Scale Data Analysis It should be noted that no one on the Pig team was involved in setting up the benchmarks and the queries don't follow the Pig cookbook suggestions for writing efficient queries, so these results should be considered *extremely* preliminary. Furthermore, I can't see any way that Hive should be able to beat raw map/reduce, since Hive uses map/reduce to run the job. In the future, it would be better to involve the respective communities (mapreduce-dev and pig-dev) far before pushing benchmark results out to the user lists. The Hadoop project, which includes all three subprojects, needs to be a cooperative community that is trying to build the best software we can. Getting benchmark numbers is good, but are better done in a collaborative manner. -- Owen
Re: Multicluster Communication
On 6/19/09 3:49 AM, Harish Mallipeddi harish.mallipe...@gmail.com wrote: Why do you want to do this in the first place? It seems like you want cluster1 to be a plain HDFS cluster and cluster2 to be a mapred cluster. Doing something like that will be disastrous - Hadoop is all about sending computation closer to your data. If you don't want that, you need not even use hadoop. Given some of the limitations with HDFS (quota operability, security), I can easily why it would be desirable to have static data coming from one grid while doing computation/intermediate outputs/real output to another. Using performance as your sole metric of viability is a bigger disaster waiting to happen. Sure, we crashed the file system, but look how fast it went down in flames!
Re: Multicluster Communication
On Fri, Jun 19, 2009 at 10:37 PM, Allen Wittenauer a...@yahoo-inc.com wrote: On 6/19/09 3:49 AM, Harish Mallipeddi harish.mallipe...@gmail.com wrote: Why do you want to do this in the first place? It seems like you want cluster1 to be a plain HDFS cluster and cluster2 to be a mapred cluster. Doing something like that will be disastrous - Hadoop is all about sending computation closer to your data. If you don't want that, you need not even use hadoop. Given some of the limitations with HDFS (quota operability, security), I can easily why it would be desirable to have static data coming from one grid while doing computation/intermediate outputs/real output to another. Using performance as your sole metric of viability is a bigger disaster waiting to happen. Sure, we crashed the file system, but look how fast it went down in flames! Well apart from doing a distcp between the 2 clusters periodically, I don't see how this can be done in a way that would yield acceptable performance. -- Harish Mallipeddi http://blog.poundbang.in
Re: sleep 60 between start-dfs.sh and putting files. Is it normal?
Hey Pavel, It's also worth checking the number of data nodes that have registered with the name node, depending on what you're trying to do when HDFS is ready. Try this: hadoop dfsadmin -report | grep Datanodes available | awk '{ print $3 }' - or - MIN_NODES=5 MAX_RETRIES=15 counter=0 while [ `hadoop dfsadmin -report | grep Datanodes available | awk '{ print $3 }'` -ne $MIN_NODES ] do sleep 2 counter=$((counter+1)) if [ $counter -gt $MAX_RETRIES ] then echo Note enough data nodes registered! exit 1 fi done If you try to write HDFS data immediately after the name node is out of safe mode, you might get replication errors if data nodes haven't registered yet. Alex On Fri, Jun 19, 2009 at 6:21 AM, Todd Lipcon t...@cloudera.com wrote: Hi Pavel, You should use hadoop dfsadmin -safemode wait after starting your cluster. This will wait for the namenode to exit safe mode so you can begin making modifications. -Todd On Fri, Jun 19, 2009 at 9:03 AM, pavel kolodin pavelkolo...@gmail.com wrote: Hello. How i can ensure that cluster is up? Now i using sleep 60 between start-dfs.sh and putting files to input... Thanks.
Re: A simple performance benchmark for Hadoop, Hive and Pig
This numbers are definitely preliminary and the reason that we send them out was to involve the community from the get go and have them critique this work. The mistake though was sending this out on the users list as opposed to the dev lists. Regarding better than map/reduce I think that the number is better than thae particular way the query was implemented in the SIGMOD paper. It is more of a reflection of the implementation there as opposed to map/reduce in general. In keeping with Owen's comments we should move this discussion to the dev lists, users is not an appropriate forum for it. Ashish - Original Message - From: Owen O'Malley owen.omal...@gmail.com To: core-user@hadoop.apache.org core-user@hadoop.apache.org; pig-u...@hadoop.apache.org pig-u...@hadoop.apache.org; hive-u...@hadoop.apache.org hive-u...@hadoop.apache.org Sent: Fri Jun 19 10:03:06 2009 Subject: Re: A simple performance benchmark for Hadoop, Hive and Pig On Thu, Jun 18, 2009 at 9:29 PM, Zheng Shao zs...@facebook.com wrote: Yuntao Jia, our intern this summer, did a simple performance benchmark for Hadoop, Hive and Pig based on the queries in the SIGMOD 2009 paper: A Comparison of Approaches to Large-Scale Data Analysis It should be noted that no one on the Pig team was involved in setting up the benchmarks and the queries don't follow the Pig cookbook suggestions for writing efficient queries, so these results should be considered *extremely* preliminary. Furthermore, I can't see any way that Hive should be able to beat raw map/reduce, since Hive uses map/reduce to run the job. In the future, it would be better to involve the respective communities (mapreduce-dev and pig-dev) far before pushing benchmark results out to the user lists. The Hadoop project, which includes all three subprojects, needs to be a cooperative community that is trying to build the best software we can. Getting benchmark numbers is good, but are better done in a collaborative manner. -- Owen
RE: A simple performance benchmark for Hadoop, Hive and Pig
I completely agree with Owen on this point. Let's move all discussions to dev lists and jira: http://issues.apache.org/jira/browse/HIVE-396 I was confused by seeing so many automatic emails in the dev mailing list. Zheng -Original Message- From: Owen O'Malley [mailto:owen.omal...@gmail.com] Sent: Friday, June 19, 2009 10:03 AM To: core-user@hadoop.apache.org; pig-u...@hadoop.apache.org; hive-u...@hadoop.apache.org Subject: Re: A simple performance benchmark for Hadoop, Hive and Pig On Thu, Jun 18, 2009 at 9:29 PM, Zheng Shao zs...@facebook.com wrote: Yuntao Jia, our intern this summer, did a simple performance benchmark for Hadoop, Hive and Pig based on the queries in the SIGMOD 2009 paper: A Comparison of Approaches to Large-Scale Data Analysis It should be noted that no one on the Pig team was involved in setting up the benchmarks and the queries don't follow the Pig cookbook suggestions for writing efficient queries, so these results should be considered *extremely* preliminary. Furthermore, I can't see any way that Hive should be able to beat raw map/reduce, since Hive uses map/reduce to run the job. In the future, it would be better to involve the respective communities (mapreduce-dev and pig-dev) far before pushing benchmark results out to the user lists. The Hadoop project, which includes all three subprojects, needs to be a cooperative community that is trying to build the best software we can. Getting benchmark numbers is good, but are better done in a collaborative manner. -- Owen
Announcing CloudBase-1.3.1 release
CloudBase is a data warehouse system for Terabyte Petabyte scale analytics. It is built on top of hadoop. It allows you to query flat files using ANSI SQL. We have released 1.3.1 version of CloudBase on sourceforge- https://sourceforge.net/projects/cloudbase Please give it a try and send us your feedback. You can follow CloudBase related discussion in the google mail list: cloudbase-us...@googlegroups.com Release notes - New Features: * CREATE CSV tables - One can create tables on top of data in CSV (Comma Separated Values) format and query them using SQL. Current implementation doesn't accept CSV records which span multiple lines. Data may not be processed correctly if a field contains embedded line- breaks. Please visit http://cloudbase.sourceforge.net/index.html#userDoc for detailed specification of the CSV format. Bug fixes: * Aggregate function 'AVG' returns the same value as 'SUM' function * If a query has multiple aliases, only the last alias works --~--~-~--~~~---~--~~ You received this message because you are subscribed to the Google Groups CloudBase group. To post to this group, send email to cloudbase-us...@googlegroups.com To unsubscribe from this group, send email to cloudbase-users+unsubscr...@googlegroups.com For more options, visit this group at http://groups.google.com/group/cloudbase-users?hl=en -~--~~~~--~~--~--~---
Re: :!!
Yes, any machine that has network access to the cluster can read/write to hdfs. It does not need to be part of the cluster or running any hadoop daemons. Such a client just needs to have hadoop set up on it and the configuration details for contacting the namenode. If using the hadoop command line, this means that the hadoop xml config files have to be set up. If you embed the hadoop jars in your own app, you have to provide the config information via files or programatically. Essentially, the client only needs to know how to contact the namenode. The namenode will automatically tell the hdfs client how to communicate to each datanode for storing or getting data. On 6/14/09 8:54 PM, Sugandha Naolekar sugandha@gmail.com wrote: Hello! I want to execute all my code on a machine that's remote(not a part of hadoop cluster). This code includes ::file transfers between any nodes (remote or within hadoop cluster or within same LAN)-irrespective.; and HDFS. I will have to simply write a code for this. Is it possible? Thanks, Regards- -- Regards! Sugandha
Re: multiple file input
For the sake of simplification I have simplified my input into two files 1. FileA 2. FileB As I said earlier I want to compare every record of FileA against every record in FileB I know this is n2 but this is the process. I wrote a simple InputFormat and RecordReader. It seems each file is read serially one after another. How can my record read have reference to both files at the same line so that I can create cross list of FileA and FileB for the mapper. Basically the way I see is to get mapper one record from FileA and all records from FileB so that mapper can compare n2 and forward them to reducer. thanks pmg wrote: Thanks owen. Are there any examples that I can look at? owen.omalley wrote: On Jun 18, 2009, at 10:56 AM, pmg wrote: Each line from FileA gets compared with every line from FileB1, FileB2 etc. etc. FileB1, FileB2 etc. are in a different input directory In the general case, I'd define an InputFormat that takes two directories, computes the input splits for each directory and generates a new list of InputSplits that is the cross-product of the two lists. So instead of FileSplit, it would use a FileSplitPair that gives the FileSplit for dir1 and the FileSplit for dir2 and the record reader would return a TextPair with left and right records (ie. lines). Clearly, you read the first line of split1 and cross it by each line from split2, then move to the second line of split1 and process each line from split2, etc. You'll need to ensure that you don't overwhelm the system with either too many input splits (ie. maps). Also don't forget that N^2/M grows much faster with the size of the input (N) than the M machines can handle in a fixed amount of time. Two input directories 1. input1 directory with a single file of 600K records - FileA 2. input2 directory segmented into different files with 2Million records - FileB1, FileB2 etc. In this particular case, it would be right to load all of FileA into memory and process the chunks of FileB/part-*. Then it would be much faster than needing to re-read the file over and over again, but otherwise it would be the same. -- Owen -- View this message in context: http://www.nabble.com/multiple-file-input-tp24095358p24119228.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: multiple file input
On Fri, Jun 19, 2009 at 2:41 PM, pmg parmod.me...@gmail.com wrote: For the sake of simplification I have simplified my input into two files 1. FileA 2. FileB As I said earlier I want to compare every record of FileA against every record in FileB I know this is n2 but this is the process. I wrote a simple InputFormat and RecordReader. It seems each file is read serially one after another. How can my record read have reference to both files at the same line so that I can create cross list of FileA and FileB for the mapper. Basically the way I see is to get mapper one record from FileA and all records from FileB so that mapper can compare n2 and forward them to reducer. It will be hard (and inefficient) to do this in Mapper using some custom intput format. What you can do is use Semi Join technique- Since File A is smaller, run a map reduce job that will output key,value pair where key is the field or set of fields on which you want to do the comparison and value is the whole line. The reducer is simply an Identity reducer which writes the files. So your fileA has been partitioned on the field(s). you can also create bloom filter on this field and store it in Distributed Cache. Now read FileB, load Bloom filter into memory and see if the field from line of FileB is present in Bloom filter, if yes emit Key,Value pair else not. At reducers, you get the contents of FileB partitioned just like contents of fileA were partitioned and at a particular reducer you get lines sorted on the field you want to do the comparison, At this point you read the contents of FileA that reached this reducer and since its contents were sorted as well, you can quickly go over the two lists. -Tarandeep thanks pmg wrote: Thanks owen. Are there any examples that I can look at? owen.omalley wrote: On Jun 18, 2009, at 10:56 AM, pmg wrote: Each line from FileA gets compared with every line from FileB1, FileB2 etc. etc. FileB1, FileB2 etc. are in a different input directory In the general case, I'd define an InputFormat that takes two directories, computes the input splits for each directory and generates a new list of InputSplits that is the cross-product of the two lists. So instead of FileSplit, it would use a FileSplitPair that gives the FileSplit for dir1 and the FileSplit for dir2 and the record reader would return a TextPair with left and right records (ie. lines). Clearly, you read the first line of split1 and cross it by each line from split2, then move to the second line of split1 and process each line from split2, etc. You'll need to ensure that you don't overwhelm the system with either too many input splits (ie. maps). Also don't forget that N^2/M grows much faster with the size of the input (N) than the M machines can handle in a fixed amount of time. Two input directories 1. input1 directory with a single file of 600K records - FileA 2. input2 directory segmented into different files with 2Million records - FileB1, FileB2 etc. In this particular case, it would be right to load all of FileA into memory and process the chunks of FileB/part-*. Then it would be much faster than needing to re-read the file over and over again, but otherwise it would be the same. -- Owen -- View this message in context: http://www.nabble.com/multiple-file-input-tp24095358p24119228.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: multiple file input
thanks tarandeep Correct if I am wrong that when I map FileA mapper created key,value pair and sends across to the reducer. If so then how can I compare when FileB is not even mapped yet. Tarandeep wrote: On Fri, Jun 19, 2009 at 2:41 PM, pmg parmod.me...@gmail.com wrote: For the sake of simplification I have simplified my input into two files 1. FileA 2. FileB As I said earlier I want to compare every record of FileA against every record in FileB I know this is n2 but this is the process. I wrote a simple InputFormat and RecordReader. It seems each file is read serially one after another. How can my record read have reference to both files at the same line so that I can create cross list of FileA and FileB for the mapper. Basically the way I see is to get mapper one record from FileA and all records from FileB so that mapper can compare n2 and forward them to reducer. It will be hard (and inefficient) to do this in Mapper using some custom intput format. What you can do is use Semi Join technique- Since File A is smaller, run a map reduce job that will output key,value pair where key is the field or set of fields on which you want to do the comparison and value is the whole line. The reducer is simply an Identity reducer which writes the files. So your fileA has been partitioned on the field(s). you can also create bloom filter on this field and store it in Distributed Cache. Now read FileB, load Bloom filter into memory and see if the field from line of FileB is present in Bloom filter, if yes emit Key,Value pair else not. At reducers, you get the contents of FileB partitioned just like contents of fileA were partitioned and at a particular reducer you get lines sorted on the field you want to do the comparison, At this point you read the contents of FileA that reached this reducer and since its contents were sorted as well, you can quickly go over the two lists. -Tarandeep thanks pmg wrote: Thanks owen. Are there any examples that I can look at? owen.omalley wrote: On Jun 18, 2009, at 10:56 AM, pmg wrote: Each line from FileA gets compared with every line from FileB1, FileB2 etc. etc. FileB1, FileB2 etc. are in a different input directory In the general case, I'd define an InputFormat that takes two directories, computes the input splits for each directory and generates a new list of InputSplits that is the cross-product of the two lists. So instead of FileSplit, it would use a FileSplitPair that gives the FileSplit for dir1 and the FileSplit for dir2 and the record reader would return a TextPair with left and right records (ie. lines). Clearly, you read the first line of split1 and cross it by each line from split2, then move to the second line of split1 and process each line from split2, etc. You'll need to ensure that you don't overwhelm the system with either too many input splits (ie. maps). Also don't forget that N^2/M grows much faster with the size of the input (N) than the M machines can handle in a fixed amount of time. Two input directories 1. input1 directory with a single file of 600K records - FileA 2. input2 directory segmented into different files with 2Million records - FileB1, FileB2 etc. In this particular case, it would be right to load all of FileA into memory and process the chunks of FileB/part-*. Then it would be much faster than needing to re-read the file over and over again, but otherwise it would be the same. -- Owen -- View this message in context: http://www.nabble.com/multiple-file-input-tp24095358p24119228.html Sent from the Hadoop core-user mailing list archive at Nabble.com. -- View this message in context: http://www.nabble.com/multiple-file-input-tp24095358p24119864.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: multiple file input
oh my bad, I was not clear- For FileB, you will be running a second map reduce job. In mapper, you can use the Bloom Filter, created in first map reduce job (if you wish to use) to eliminate the lines whose keys dont match. Mapper will emit key,value pair, where key is teh field on which you want to do comparison and value is the whole line. when the key,value pairs go to reducers, then you have lines from FileB sorted on the field yon want to use for comparison. Now you can read contents of FileA (note that if you ran first job with N reducers, you will have N paritions of FileA and you want to read only the partition meant for this reducer). Content of FileA is also sorted on the field, Now you can easily compare the lines from two files. CloudBase- cloudbase.sourceforge.net has code for doing join this fashion. Let me know if you need more clarification. -Tarandeep On Fri, Jun 19, 2009 at 3:45 PM, pmg parmod.me...@gmail.com wrote: thanks tarandeep Correct if I am wrong that when I map FileA mapper created key,value pair and sends across to the reducer. If so then how can I compare when FileB is not even mapped yet. Tarandeep wrote: On Fri, Jun 19, 2009 at 2:41 PM, pmg parmod.me...@gmail.com wrote: For the sake of simplification I have simplified my input into two files 1. FileA 2. FileB As I said earlier I want to compare every record of FileA against every record in FileB I know this is n2 but this is the process. I wrote a simple InputFormat and RecordReader. It seems each file is read serially one after another. How can my record read have reference to both files at the same line so that I can create cross list of FileA and FileB for the mapper. Basically the way I see is to get mapper one record from FileA and all records from FileB so that mapper can compare n2 and forward them to reducer. It will be hard (and inefficient) to do this in Mapper using some custom intput format. What you can do is use Semi Join technique- Since File A is smaller, run a map reduce job that will output key,value pair where key is the field or set of fields on which you want to do the comparison and value is the whole line. The reducer is simply an Identity reducer which writes the files. So your fileA has been partitioned on the field(s). you can also create bloom filter on this field and store it in Distributed Cache. Now read FileB, load Bloom filter into memory and see if the field from line of FileB is present in Bloom filter, if yes emit Key,Value pair else not. At reducers, you get the contents of FileB partitioned just like contents of fileA were partitioned and at a particular reducer you get lines sorted on the field you want to do the comparison, At this point you read the contents of FileA that reached this reducer and since its contents were sorted as well, you can quickly go over the two lists. -Tarandeep thanks pmg wrote: Thanks owen. Are there any examples that I can look at? owen.omalley wrote: On Jun 18, 2009, at 10:56 AM, pmg wrote: Each line from FileA gets compared with every line from FileB1, FileB2 etc. etc. FileB1, FileB2 etc. are in a different input directory In the general case, I'd define an InputFormat that takes two directories, computes the input splits for each directory and generates a new list of InputSplits that is the cross-product of the two lists. So instead of FileSplit, it would use a FileSplitPair that gives the FileSplit for dir1 and the FileSplit for dir2 and the record reader would return a TextPair with left and right records (ie. lines). Clearly, you read the first line of split1 and cross it by each line from split2, then move to the second line of split1 and process each line from split2, etc. You'll need to ensure that you don't overwhelm the system with either too many input splits (ie. maps). Also don't forget that N^2/M grows much faster with the size of the input (N) than the M machines can handle in a fixed amount of time. Two input directories 1. input1 directory with a single file of 600K records - FileA 2. input2 directory segmented into different files with 2Million records - FileB1, FileB2 etc. In this particular case, it would be right to load all of FileA into memory and process the chunks of FileB/part-*. Then it would be much faster than needing to re-read the file over and over again, but otherwise it would be the same. -- Owen -- View this message in context: http://www.nabble.com/multiple-file-input-tp24095358p24119228.html Sent from the Hadoop core-user mailing list archive at Nabble.com. -- View this message in context: http://www.nabble.com/multiple-file-input-tp24095358p24119864.html Sent from the Hadoop core-user mailing list archive at Nabble.com.
Re: multiple file input
Thanks Tarandeep for prompt reply. Let me give you an example structure of FileA and FileB FileA --- 123 ABC 1 FileB - 123 ABC 2 456 BNF 3 Both the files are tab delimited. Every record is not simply compared with each record in FileB. There's heuristic I am going to run for the comparison and score the results along with output. So my output file is like this Output 123 ABC 1 123 ABC 2 10 123 ABC 1 456 BNF 3 20 first 3 columns in the output file are from FileA, next three columns are from FileB and the last column is their comparison score. So basically you are saying we can use two map/reduce jobs for FileA and other for FileB map (FileA) - reduce (FileA)- map (FileB) - reduce (FileB) For the first file FileA I map them with k,V (I can't use bloom filter because comparison between each record from FileA is not a straight comparison with every record in FileB - They are compared using heuristic and scored them for their quantitative comparison and stored) In the FileA reduce I store it in the distributed cache. Once this is done map the FileB in the second map and in the FileB reduce read in the FileA from the distributed cache and do my heuristics for every K,V) from FileB and store my result thanks Tarandeep wrote: oh my bad, I was not clear- For FileB, you will be running a second map reduce job. In mapper, you can use the Bloom Filter, created in first map reduce job (if you wish to use) to eliminate the lines whose keys dont match. Mapper will emit key,value pair, where key is teh field on which you want to do comparison and value is the whole line. when the key,value pairs go to reducers, then you have lines from FileB sorted on the field yon want to use for comparison. Now you can read contents of FileA (note that if you ran first job with N reducers, you will have N paritions of FileA and you want to read only the partition meant for this reducer). Content of FileA is also sorted on the field, Now you can easily compare the lines from two files. CloudBase- cloudbase.sourceforge.net has code for doing join this fashion. Let me know if you need more clarification. -Tarandeep On Fri, Jun 19, 2009 at 3:45 PM, pmg parmod.me...@gmail.com wrote: thanks tarandeep Correct if I am wrong that when I map FileA mapper created key,value pair and sends across to the reducer. If so then how can I compare when FileB is not even mapped yet. Tarandeep wrote: On Fri, Jun 19, 2009 at 2:41 PM, pmg parmod.me...@gmail.com wrote: For the sake of simplification I have simplified my input into two files 1. FileA 2. FileB As I said earlier I want to compare every record of FileA against every record in FileB I know this is n2 but this is the process. I wrote a simple InputFormat and RecordReader. It seems each file is read serially one after another. How can my record read have reference to both files at the same line so that I can create cross list of FileA and FileB for the mapper. Basically the way I see is to get mapper one record from FileA and all records from FileB so that mapper can compare n2 and forward them to reducer. It will be hard (and inefficient) to do this in Mapper using some custom intput format. What you can do is use Semi Join technique- Since File A is smaller, run a map reduce job that will output key,value pair where key is the field or set of fields on which you want to do the comparison and value is the whole line. The reducer is simply an Identity reducer which writes the files. So your fileA has been partitioned on the field(s). you can also create bloom filter on this field and store it in Distributed Cache. Now read FileB, load Bloom filter into memory and see if the field from line of FileB is present in Bloom filter, if yes emit Key,Value pair else not. At reducers, you get the contents of FileB partitioned just like contents of fileA were partitioned and at a particular reducer you get lines sorted on the field you want to do the comparison, At this point you read the contents of FileA that reached this reducer and since its contents were sorted as well, you can quickly go over the two lists. -Tarandeep thanks pmg wrote: Thanks owen. Are there any examples that I can look at? owen.omalley wrote: On Jun 18, 2009, at 10:56 AM, pmg wrote: Each line from FileA gets compared with every line from FileB1, FileB2 etc. etc. FileB1, FileB2 etc. are in a different input directory In the general case, I'd define an InputFormat that takes two directories, computes the input splits for each directory and generates a new list of InputSplits that is the cross-product of the two lists. So instead of FileSplit, it would use a FileSplitPair that gives the FileSplit for dir1 and the FileSplit for dir2 and the record reader would return a TextPair with
Re: multiple file input
hey I think I got your question wrong. My solution won't let you achieve what you intended. your example made it clear. Since it is a cross product, the contents of one of the files has to be in memory for iteration, but since size is big, so might not be possible, so how about this solution and this will scale too- First make smaller chunks of your big files (small enough that one chunk can be stored in memory). Hadoop's block size is set to 64MB by default. If this seems ok according to the RAM you have, then simply run Identity Mapper only job on for both Files A and B. The output will be smaller files with the names part-0001, part-0002 etc. For simplicty let us call chunks of File A as A1, A2, A3... and chunks of B as B1, B2, B3 Create a file (or write a program that will generate this file) that contains the cross product of these chunks- A1 B1 A1 B2 A1 B3 .. A2 B1 A2 B2 A2 B3 .. Now run a Map only job (no reducer). Use NLineInputFormat and set N = 1. give input to your job this file. NLineInputFormat will give each mapper a line from this file. So for example, lets say a mapper got the line A1 B3, which means take cross product of the contents of chunk A1 and chunk B1. you can read one of the chunk completely and store in memory as a list or array. And then read second chunk and do the comparison. Now, as you would have guessed, instead of creating chunks, you can actually calculate offsets in the files (after an interval of say 64MB) and can achieve the same effect. HDFS allows seeking to an offset in a file so that will work too. -Tarandeep On Fri, Jun 19, 2009 at 4:33 PM, pmg parmod.me...@gmail.com wrote: Thanks Tarandeep for prompt reply. Let me give you an example structure of FileA and FileB FileA --- 123 ABC 1 FileB - 123 ABC 2 456 BNF 3 Both the files are tab delimited. Every record is not simply compared with each record in FileB. There's heuristic I am going to run for the comparison and score the results along with output. So my output file is like this Output 123 ABC 1 123 ABC 2 10 123 ABC 1 456 BNF 3 20 first 3 columns in the output file are from FileA, next three columns are from FileB and the last column is their comparison score. So basically you are saying we can use two map/reduce jobs for FileA and other for FileB map (FileA) - reduce (FileA)- map (FileB) - reduce (FileB) For the first file FileA I map them with k,V (I can't use bloom filter because comparison between each record from FileA is not a straight comparison with every record in FileB - They are compared using heuristic and scored them for their quantitative comparison and stored) In the FileA reduce I store it in the distributed cache. Once this is done map the FileB in the second map and in the FileB reduce read in the FileA from the distributed cache and do my heuristics for every K,V) from FileB and store my result thanks Tarandeep wrote: oh my bad, I was not clear- For FileB, you will be running a second map reduce job. In mapper, you can use the Bloom Filter, created in first map reduce job (if you wish to use) to eliminate the lines whose keys dont match. Mapper will emit key,value pair, where key is teh field on which you want to do comparison and value is the whole line. when the key,value pairs go to reducers, then you have lines from FileB sorted on the field yon want to use for comparison. Now you can read contents of FileA (note that if you ran first job with N reducers, you will have N paritions of FileA and you want to read only the partition meant for this reducer). Content of FileA is also sorted on the field, Now you can easily compare the lines from two files. CloudBase- cloudbase.sourceforge.net has code for doing join this fashion. Let me know if you need more clarification. -Tarandeep On Fri, Jun 19, 2009 at 3:45 PM, pmg parmod.me...@gmail.com wrote: thanks tarandeep Correct if I am wrong that when I map FileA mapper created key,value pair and sends across to the reducer. If so then how can I compare when FileB is not even mapped yet. Tarandeep wrote: On Fri, Jun 19, 2009 at 2:41 PM, pmg parmod.me...@gmail.com wrote: For the sake of simplification I have simplified my input into two files 1. FileA 2. FileB As I said earlier I want to compare every record of FileA against every record in FileB I know this is n2 but this is the process. I wrote a simple InputFormat and RecordReader. It seems each file is read serially one after another. How can my record read have reference to both files at the same line so that I can create cross list of FileA and FileB for the mapper. Basically the way I see is to get mapper one record from FileA and all records from FileB so that mapper can compare n2 and forward them to reducer. It will be hard (and inefficient) to do this in Mapper
Re: multiple file input
First make smaller chunks of your big files (small enough that one chunk can be stored in memory). Hadoop's block size is set to 64MB by default. If this seems ok according to the RAM you have, then simply run Identity Mapper only job on for both Files A and B. The output will be smaller files with the names part-0001, part-0002 etc. For simplicty let us call chunks of File A as A1, A2, A3... and chunks of B as B1, B2, B3 I am planning to run this on amazon elastic map with large cpu so RAM I think would not be a problem. I can have smaller input files outside map/reduce so I guess we don't have to run this phase to get small file chunks as A1, A2, A3... and chunks of B as B1, B2, B3 Create a file (or write a program that will generate this file) that contains the cross product of these chunks- A1 B1 A1 B2 A1 B3 .. A2 B1 A2 B2 A2 B3 .. Correct me If I am wrong. the actual FileA that gets divided into chunks A1,A2...has around 600K file records. FileB that gets divided into B1, B2has around 2 million file record. So I guess we looking at file record size of cartesian product of 600K * 2Millions. We are looking at peta bytes of data. This would be a hard sell :) Tarandeep wrote: hey I think I got your question wrong. My solution won't let you achieve what you intended. your example made it clear. Since it is a cross product, the contents of one of the files has to be in memory for iteration, but since size is big, so might not be possible, so how about this solution and this will scale too- First make smaller chunks of your big files (small enough that one chunk can be stored in memory). Hadoop's block size is set to 64MB by default. If this seems ok according to the RAM you have, then simply run Identity Mapper only job on for both Files A and B. The output will be smaller files with the names part-0001, part-0002 etc. For simplicty let us call chunks of File A as A1, A2, A3... and chunks of B as B1, B2, B3 Create a file (or write a program that will generate this file) that contains the cross product of these chunks- A1 B1 A1 B2 A1 B3 .. A2 B1 A2 B2 A2 B3 .. Now run a Map only job (no reducer). Use NLineInputFormat and set N = 1. give input to your job this file. NLineInputFormat will give each mapper a line from this file. So for example, lets say a mapper got the line A1 B3, which means take cross product of the contents of chunk A1 and chunk B1. you can read one of the chunk completely and store in memory as a list or array. And then read second chunk and do the comparison. Now, as you would have guessed, instead of creating chunks, you can actually calculate offsets in the files (after an interval of say 64MB) and can achieve the same effect. HDFS allows seeking to an offset in a file so that will work too. -Tarandeep On Fri, Jun 19, 2009 at 4:33 PM, pmg parmod.me...@gmail.com wrote: Thanks Tarandeep for prompt reply. Let me give you an example structure of FileA and FileB FileA --- 123 ABC 1 FileB - 123 ABC 2 456 BNF 3 Both the files are tab delimited. Every record is not simply compared with each record in FileB. There's heuristic I am going to run for the comparison and score the results along with output. So my output file is like this Output 123 ABC 1 123 ABC 2 10 123 ABC 1 456 BNF 3 20 first 3 columns in the output file are from FileA, next three columns are from FileB and the last column is their comparison score. So basically you are saying we can use two map/reduce jobs for FileA and other for FileB map (FileA) - reduce (FileA)- map (FileB) - reduce (FileB) For the first file FileA I map them with k,V (I can't use bloom filter because comparison between each record from FileA is not a straight comparison with every record in FileB - They are compared using heuristic and scored them for their quantitative comparison and stored) In the FileA reduce I store it in the distributed cache. Once this is done map the FileB in the second map and in the FileB reduce read in the FileA from the distributed cache and do my heuristics for every K,V) from FileB and store my result thanks Tarandeep wrote: oh my bad, I was not clear- For FileB, you will be running a second map reduce job. In mapper, you can use the Bloom Filter, created in first map reduce job (if you wish to use) to eliminate the lines whose keys dont match. Mapper will emit key,value pair, where key is teh field on which you want to do comparison and value is the whole line. when the key,value pairs go to reducers, then you have lines from FileB sorted on the field yon want to use for comparison. Now you can read contents of FileA (note that if you ran first job with N reducers, you will have N paritions of FileA and you want to read only the partition meant for this reducer). Content of FileA is also sorted on the field, Now you can easily compare the