a = load '1.txt' as (a0, a1, a2); b = load '2.txt' as (b0, b1); c = filter b by b1==1; d = join a by a0, c by b0 using 'replicated';
The first job will store c into a temporarily file, replicated join will consume it later. In a simpler query, the first job might just load and store the input data, which seems to be able to optimize away (though Pig does not do it now).
Java heap error is expected since your right side input is too large to fit into memory. The error message you saw before is weird and I suspect you might have some environmental issue.
Daniel On 05/01/2011 04:31 PM, Renato Marroquín Mogrovejo wrote:
Anyone please? 2011/4/29 Renato Marroquín Mogrovejo<[email protected]>:In spite of the fact that my execution plan says that only one MapReduce will be used, in my webUI there are two MR jobs for the Pig task, I am probably missing something here in the middle, because yeah replicated joins should only use one MR job, right? And another thing I find weird is that I tried executing the FR join again and I get a JavaHeapSpace problem in the second job of it, when before I got an error saying something like Pig was expecting X bytes but it was getting X+Y bytes. I haven't been able to replicate this error, it probably has something to do with my env at some point in time. I thought that error of Pig expecting X bytes and getting more than expected had something to do with Pig seeing about a 4x expansion when loading data from disk into memory, that is why I was asking about how this count is done (available java heap space> 4x FileSize) or something like this? Thanks again. #----------------------------------------------- # Logical Plan: #----------------------------------------------- Store 1-86 Schema: {proy_sR::sr_cde_sk: int,proy_cD::cd_dem_sk: int} Type: Unknown | |---LOJoin 1-25 Schema: {proy_sR::sr_cde_sk: int,proy_cD::cd_dem_sk: int} Type: bag | | | Project 1-23 Projections: [0] Overloaded: false FieldSchema: sr_cde_sk: int Type: int | Input: ForEach 1-18 | | | Project 1-24 Projections: [0] Overloaded: false FieldSchema: cd_dem_sk: int Type: int | Input: ForEach 1-22 | |---ForEach 1-18 Schema: {sr_cde_sk: int} Type: bag | | | | | Project 1-17 Projections: [0] Overloaded: false FieldSchema: sr_cde_sk: int Type: int | | Input: ForEach 1-66 | | | |---ForEach 1-66 Schema: {sr_cde_sk: int} Type: bag | | | | | Cast 1-35 FieldSchema: sr_cde_sk: int Type: int | | | | | |---Project 1-34 Projections: [0] Overloaded: false FieldSchema: sr_cde_sk: bytearray Type: bytearray | | Input: Load 1-13 | | | |---Load 1-13 Schema: {sr_cde_sk: bytearray} Type: bag | |---ForEach 1-22 Schema: {cd_dem_sk: int} Type: bag | | | Project 1-21 Projections: [0] Overloaded: false FieldSchema: cd_dem_sk: int Type: int | Input: ForEach 1-85 | |---ForEach 1-85 Schema: {cd_dem_sk: int} Type: bag | | | Cast 1-68 FieldSchema: cd_demo_sk: int Type: int | | | |---Project 1-67 Projections: [0] Overloaded: false FieldSchema: cd_dem_sk: bytearray Type: bytearray | Input: Load 1-14 | |---Load 1-14 Schema: {cd_dem_sk: bytearray} Type: bag #----------------------------------------------- # Physical Plan: #----------------------------------------------- Store(fakefile:org.apache.pig.builtin.PigStorage) - 1-107 | |---FRJoin[tuple] - 1-101 | | | Project[int][0] - 1-99 | | | Project[int][0] - 1-100 | |---New For Each(false)[bag] - 1-92 | | | | | Project[int][0] - 1-91 | | | |---New For Each(false)[bag] - 1-90 | | | | | Cast[int] - 1-89 | | | | | |---Project[bytearray][0] - 1-88 | | | |---Load(hdfs://berlin.labbio:54310/user/hadoop/pigData/sr.dat:PigStorage('|')) - 1-87 | |---New For Each(false)[bag] - 1-98 | | | Project[int][0] - 1-97 | |---New For Each(false)[bag] - 1-96 | | | Cast[int] - 1-95 | | | |---Project[bytearray][0] - 1-94 | |---Load(hdfs://berlin.labbio:54310/user/hadoop/pigData/cd.dat:PigStorage('|')) - 1-93 2011-04-29 23:04:54,727 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 2 2011-04-29 23:04:54,727 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 2 #-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node 1-109 Map Plan Store(hdfs://berlin.labbio:54310/tmp/temp1815576246/tmp379673501:org.apache.pig.builtin.BinStorage) - 1-110 | |---New For Each(false)[bag] - 1-98 | | | Project[int][0] - 1-97 | |---New For Each(false)[bag] - 1-96 | | | Cast[int] - 1-95 | | | |---Project[bytearray][0] - 1-94 | |---Load(hdfs://berlin.labbio:54310/user/hadoop/pigData/cd.dat:PigStorage('|')) - 1-93-------- Global sort: false ---------------- MapReduce node 1-108 Map Plan Store(fakefile:org.apache.pig.builtin.PigStorage) - 1-107 | |---FRJoin[tuple] - 1-101 | | | Project[int][0] - 1-99 | | | Project[int][0] - 1-100 | |---New For Each(false)[bag] - 1-92 | | | Project[int][0] - 1-91 | |---New For Each(false)[bag] - 1-90 | | | Cast[int] - 1-89 | | | |---Project[bytearray][0] - 1-88 | |---Load(hdfs://berlin.labbio:54310/user/hadoop/pigData/sr.dat:PigStorage('|')) - 1-87-------- Global sort: false ---------------- 2011/4/28 Daniel Dai<[email protected]>:There should be only one job. Thanks Thejas point out. Daniel -----Original Message----- From: Daniel Dai Sent: Wednesday, April 27, 2011 7:18 PM To: [email protected] Cc: Renato Marroquín Mogrovejo ; [email protected] Subject: Re: Error Executing a Fragment Replicated Join Do you see the failure in the first job (sampling) or second job? Do you see the exception right after the job kick off? If the replicated side is too large, you probably will see a "Java heap exception" rather than job setup exception. It more like an environment issue. Check if you can run regular join, or you have other hadoop config file in your classpath. Daniel On 04/27/2011 05:26 PM, Renato Marroquín Mogrovejo wrote:Now that the Apache server is ok with me again, I can write back to the list. I wrote to the Apache Infra team and they told me to write messages just in plain text, disabling any html within the message (not that I ever sent html but oh well), I guess that worked :) Well, first thanks for answering. I am using pig 0.7 and my pig script is as follows: {code} sr = LOAD 'pigData/sr.dat' using PigStorage('|') AS (sr_ret_date_sk:int, sr_ret_tim_sk:int, sr_ite_sk:int, sr_cus_sk:int, sr_cde_sk:int, sr_hde_sk:int, sr_add_sk:int, sr_sto_sk:int, sr_rea_sk:int, sr_tic_num:int, sr_ret_qua:int, sr_ret_amt:double, sr_ret_tax:double, sr_ret_amt_inc_tax:double, sr_fee:double, sr_ret_sh_cst:double, sr_ref_csh:double, sr_rev_cha:double, sr_sto_cred:double, sr_net_lss:double); cd = LOAD 'pigData/cd.dat' using PigStorage('|') AS (cd_dem_sk:int, cd_gnd:chararray, cd_mrt_sts:chararray, cd_edt_sts:chararray, cd_pur_est:int, cd_cred_rtg:chararray, cd_dep_cnt:int, cd_dep_emp_cnt:int, cd_dep_col_count:int); proy_sR = FOREACH sr GENERATE sr_cde_sk; proy_cD = FOREACH cd GENERATE cd_dem_sk; join_sR_cD = JOIN proy_sR BY sr_cde_sk, proy_cD BY cd_dem_sk USING 'replicated'; STORE join_sR_cD INTO 'queryResults/query.11.sr.cd.5.1' using PigStorage('|'); {/code} Being "cd" the relation of 77MB and "sr" the relation of 32MB. I had some other similar queries in which the 32MB relation was being joined with smaller relations (<10MB) giving the same problem, I modified those, so the queries<10MB would be ones being replicated. Thanks again. Renato M. 2011/4/27 Thejas M Nair<[email protected]>:The exception indicates that the hadoop job creation failed. Are you able to run simple MR queries using each of the inputs ? It could also caused by some problem pig is having with copying the file being replicated to distributed cache. -Thejas On 4/27/11 3:42 PM, "Renato Marroquín Mogrovejo" <[email protected]> wrote: Does anybody have any suggestions? Please??? Thanks again. Renato M. 2011/4/26 Alan Gates<[email protected]>Sent for Renato, since Apache's mail system has decided it doesn't like him. Alan. I am getting an error while trying to execute a simple fragment replicated join on two files (one of 77MB and the other one of 32MB). I am using the 32MB file as the small one to be replicated, but I keep getting this error. Does any body know how this count is done? I mean how Pig determines that the small file is not small enough, or how I could modify this? I am executing these on four PC's with 3GB of RAM running DebianLenny. Thanks in advance. Renato M. Pig Stack Trace --------------- ERROR 2017: Internal error creating job configuration. org.apache.pig.backend.executionengine.ExecException: ERROR 2043: Unexpected error during execution. at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.execute(HExecutionEngine.java:332) at org.apache.pig.PigServer.executeCompiledLogicalPlan(PigServer.java:835) at org.apache.pig.PigServer.execute(PigServer.java:828) at org.apache.pig.PigServer.access$100(PigServer.java:105) at org.apache.pig.PigServer$Graph.execute(PigServer.java:1080) at org.apache.pig.PigServer.executeBatch(PigServer.java:288) at org.apache.pig.tools.grunt.GruntParser.executeBatch(GruntParser.java:109) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:166) at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:138) at org.apache.pig.tools.grunt.Grunt.exec(Grunt.java:89) at org.apache.pig.Main.main(Main.java:391) Caused by: org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException: ERROR 2017: Internal error creating job configuration. at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.getJob(JobControlCompiler.java:624) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.compile(JobControlCompiler.java:246) at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher.--
