It was actually zlib compression in Python. But you can certainly use any other compression lib. Unfortunately, this is not a built-in feature in Pig, but I agree that would be useful.
On Fri, Feb 7, 2014 at 11:45 AM, praveenesh kumar <[email protected]>wrote: > Hi Park, > > Your explanation makes perfect sense in my case. Thanks for explaining > what is happening behind the scenes. I am wondering you used normal java > compression/decompression or is there a UDF already available to do this > stuff or some kind of property that we need to enable to say to PIG that > compress bags before spilling. > > Regards > Prav > > > On Fri, Feb 7, 2014 at 4:37 PM, Cheolsoo Park <[email protected]>wrote: > >> Hi Prav, >> >> You're thinking correctly, and it's true that Pig bags are spillable. >> >> However, spilling is no magic, meaning you can still run into OOM with >> huge >> bags like you have here. Pig runs Spillable Memory Manager (SMM) in a >> separate thread. When spilling is triggered, SMM locks bags that it's >> trying to spill to disk. After the spilling is finished, GC frees up >> memory. The problem is that it's possible that more bags are loaded into >> memory while the spilling is in progress. Now JVM triggers GC, but GC >> cannot free up memory because SMM is locking the bags, resulting in OOM >> error. This happens quite often. >> >> Sounds like you do group-by to reduce the number of rows before join and >> don't immediately run any aggregation function on the grouped bags. If >> that's the case, can you compress those bags? For eg, you could add a >> foreach after group-by and run a UDF that compresses a bag and returns it >> as bytearray. From there, you're moving around small blobs rather than big >> bags. Of course, you will need to decompress them when you restore data >> out >> of those bags at some point. This trick saved me several times in the past >> particularly when I dealt with bags of large chararrays. >> >> Just a thought. Hope this is helpful. >> >> Thanks, >> Cheolsoo >> >> >> On Fri, Feb 7, 2014 at 7:37 AM, praveenesh kumar <[email protected] >> >wrote: >> >> > Thanks Park for sharing the above configs >> > >> > But I am wondering if the above config changes would make any huge >> > difference in my case. >> > As per my logs, I am very worried about this line - >> > >> > INFO org.apache.hadoop.mapred.MapTask: Record too large for in-memory >> buffer: 644245358 bytes >> > >> > If I am understanding it properly, my 1 record is very large to fit >> into the memory, which is causing the issue. >> > >> > Any of the above changes wouldn't make any huge impact, please correct >> me if I am taking it totally wrong. >> > >> > - Adding hadoop user group here as well, to throw some valuable inputs >> to understand the above question. >> > >> > >> > Since I am doing a join on a grouped bag, do you think that might be >> the case ? >> > >> > But if that is the issue, as far as I understand Bags in Pig are >> spillable, it shouldn't have given this issue. >> > >> > I can't get rid of group by, Grouping by first should idealing improve >> my join. But if this is the root cause, if I am understanding it correctly, >> > >> > do you think I should get rid of group-by. >> > >> > But my question in that case would be what would happen if I do group >> by later after join, if will result in much bigger bag (because it would >> have more records after join) >> > >> > Am I thinking here correctly ? >> > >> > Regards >> > >> > Prav >> > >> > >> > >> > On Fri, Feb 7, 2014 at 3:11 AM, Cheolsoo Park <[email protected] >> >wrote: >> > >> >> Looks like you're running out of space in MapOutputBuffer. Two >> >> suggestions- >> >> >> >> 1) >> >> You said that io.sort.mb is already set to 768 MB, but did you try to >> >> lower >> >> io.sort.spill.percent in order to spill earlier and more often? >> >> >> >> Page 12- >> >> >> >> >> http://www.slideshare.net/Hadoop_Summit/optimizing-mapreduce-job-performance >> >> >> >> 2) >> >> Can't you increase the parallelism of mappers so that each mapper has >> to >> >> handle a smaller size of data? Pig determines the number of mappers by >> >> total input size / pig.maxCombinedSplitSize (128MB by default). So you >> can >> >> try to lower pig.maxCombinedSplitSize. >> >> >> >> But I admit Pig internal data types are not memory-efficient, and that >> is >> >> an optimization opportunity. Contribute! >> >> >> >> >> >> >> >> On Thu, Feb 6, 2014 at 2:54 PM, praveenesh kumar <[email protected] >> >> >wrote: >> >> >> >> > Its a normal join. I can't use replicated join, as the data is very >> >> large. >> >> > >> >> > Regards >> >> > Prav >> >> > >> >> > >> >> > On Thu, Feb 6, 2014 at 7:52 PM, abhishek <[email protected]> >> >> > wrote: >> >> > >> >> > > Hi Praveenesh, >> >> > > >> >> > > Did you use "replicated join" in your pig script or is it a regular >> >> join >> >> > ?? >> >> > > >> >> > > Regards >> >> > > Abhishek >> >> > > >> >> > > Sent from my iPhone >> >> > > >> >> > > > On Feb 6, 2014, at 11:25 AM, praveenesh kumar < >> [email protected] >> >> > >> >> > > wrote: >> >> > > > >> >> > > > Hi all, >> >> > > > >> >> > > > I am running a Pig Script which is running fine for small data. >> But >> >> > when >> >> > > I >> >> > > > scale the data, I am getting the following error at my map stage. >> >> > > > Please refer to the map logs as below. >> >> > > > >> >> > > > My Pig script is doing a group by first, followed by a join on >> the >> >> > > grouped >> >> > > > data. >> >> > > > >> >> > > > >> >> > > > Any clues to understand where I should look at or how shall I >> deal >> >> with >> >> > > > this situation. I don't want to just go by just increasing the >> heap >> >> > > space. >> >> > > > My map jvm heap space is already 3 GB with io.sort.mb = 768 MB. >> >> > > > >> >> > > > 2014-02-06 19:15:12,243 WARN >> >> org.apache.hadoop.util.NativeCodeLoader: >> >> > > > Unable to load native-hadoop library for your platform... using >> >> > > > builtin-java classes where applicable 2014-02-06 19:15:15,025 >> INFO >> >> > > > org.apache.hadoop.util.ProcessTree: setsid exited with exit code >> 0 >> >> > > > 2014-02-06 19:15:15,123 INFO org.apache.hadoop.mapred.Task: Using >> >> > > > ResourceCalculatorPlugin : >> >> > > > >> >> org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2bd9e2822014-02-06 >> >> > > > 19:15:15,546 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = >> 768 >> >> > > > 2014-02-06 19:15:19,846 INFO org.apache.hadoop.mapred.MapTask: >> data >> >> > > buffer >> >> > > > = 612032832/644245088 2014-02-06 19:15:19,846 INFO >> >> > > > org.apache.hadoop.mapred.MapTask: record buffer = >> 9563013/10066330 >> >> > > > 2014-02-06 19:15:20,037 INFO >> >> org.apache.hadoop.io.compress.CodecPool: >> >> > Got >> >> > > > brand-new decompressor 2014-02-06 19:15:21,083 INFO >> >> > > > >> >> > > >> >> > >> >> >> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader: >> >> > > > Created input record counter: Input records from _1_tmp1327641329 >> >> > > > 2014-02-06 19:15:52,894 INFO org.apache.hadoop.mapred.MapTask: >> >> Spilling >> >> > > map >> >> > > > output: buffer full= true 2014-02-06 19:15:52,895 INFO >> >> > > > org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = >> 611949600; >> >> > > bufvoid >> >> > > > = 644245088 2014-02-06 19:15:52,895 INFO >> >> > > org.apache.hadoop.mapred.MapTask: >> >> > > > kvstart = 0; kvend = 576; length = 10066330 2014-02-06 >> 19:16:06,182 >> >> > INFO >> >> > > > org.apache.hadoop.mapred.MapTask: Finished spill 0 2014-02-06 >> >> > > 19:16:16,169 >> >> > > > INFO org.apache.pig.impl.util.SpillableMemoryManager: first >> memory >> >> > > handler >> >> > > > call - Collection threshold init = 328728576(321024K) used = >> >> > > > 1175055104(1147514K) committed = 1770848256(1729344K) max = >> >> > > > 2097152000(2048000K) 2014-02-06 19:16:20,446 INFO >> >> > > > org.apache.pig.impl.util.SpillableMemoryManager: Spilled an >> >> estimate of >> >> > > > 308540402 bytes from 1 objects. init = 328728576(321024K) used = >> >> > > > 1175055104(1147514K) committed = 1770848256(1729344K) max = >> >> > > > 2097152000(2048000K) 2014-02-06 19:17:22,246 INFO >> >> > > > org.apache.pig.impl.util.SpillableMemoryManager: first memory >> >> handler >> >> > > call- >> >> > > > Usage threshold init = 328728576(321024K) used = >> >> 1768466512(1727018K) >> >> > > > committed = 1770848256(1729344K) max = 2097152000(2048000K) >> >> 2014-02-06 >> >> > > > 19:17:35,597 INFO >> org.apache.pig.impl.util.SpillableMemoryManager: >> >> > > Spilled >> >> > > > an estimate of 1073462600 bytes from 1 objects. init = >> >> > 328728576(321024K) >> >> > > > used = 1768466512(1727018K) committed = 1770848256(1729344K) max >> = >> >> > > > 2097152000(2048000K) 2014-02-06 19:18:01,276 INFO >> >> > > > org.apache.hadoop.mapred.MapTask: Spilling map output: buffer >> full= >> >> > true >> >> > > > 2014-02-06 19:18:01,288 INFO org.apache.hadoop.mapred.MapTask: >> >> > bufstart = >> >> > > > 611949600; bufend = 52332788; bufvoid = 644245088 2014-02-06 >> >> > 19:18:01,288 >> >> > > > INFO org.apache.hadoop.mapred.MapTask: kvstart = 576; kvend = >> 777; >> >> > > length = >> >> > > > 10066330 2014-02-06 19:18:03,377 INFO >> >> org.apache.hadoop.mapred.MapTask: >> >> > > > Finished spill 1 2014-02-06 19:18:05,494 INFO >> >> > > > org.apache.hadoop.mapred.MapTask: Record too large for in-memory >> >> > buffer: >> >> > > > 644246693 bytes 2014-02-06 19:18:36,008 INFO >> >> > > > org.apache.pig.impl.util.SpillableMemoryManager: Spilled an >> >> estimate of >> >> > > > 306271368 bytes from 1 objects. init = 328728576(321024K) used = >> >> > > > 1449267128(1415299K) committed = 2097152000(2048000K) max = >> >> > > > 2097152000(2048000K) 2014-02-06 19:18:44,448 INFO >> >> > > > org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' >> >> > truncater >> >> > > > with mapRetainSize=-1 and reduceRetainSize=-1 2014-02-06 >> >> 19:18:44,780 >> >> > > FATAL >> >> > > > org.apache.hadoop.mapred.Child: Error running child : >> >> > > > java.lang.OutOfMemoryError: Java heap space at >> >> > > > java.util.Arrays.copyOf(Arrays.java:2786) at >> >> > > > >> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94) >> >> at >> >> > > > java.io.DataOutputStream.write(DataOutputStream.java:90) at >> >> > > > java.io.DataOutputStream.writeUTF(DataOutputStream.java:384) at >> >> > > > java.io.DataOutputStream.writeUTF(DataOutputStream.java:306) at >> >> > > > >> >> org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:454) at >> >> > > > >> >> org.apache.pig.data.BinInterSedes.writeTuple(BinInterSedes.java:542) at >> >> > > > >> org.apache.pig.data.BinInterSedes.writeBag(BinInterSedes.java:523) >> >> at >> >> > > > >> >> org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:361) at >> >> > > > >> >> org.apache.pig.data.BinInterSedes.writeTuple(BinInterSedes.java:542) at >> >> > > > >> >> org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:357) at >> >> > > > org.apache.pig.data.BinSedesTuple.write(BinSedesTuple.java:57) at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.pig.impl.io.PigNullableWritable.write(PigNullableWritable.java:123) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:90) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:77) >> >> > > > at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:179) >> at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.spillSingleRecord(MapTask.java:1501) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1091) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Map.collect(PigGenericMapReduce.java:128) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:269) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:262) >> >> > > > at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64) >> >> > > > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at >> >> > > > org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) >> at >> >> > > > org.apache.hadoop.mapred.MapTask.run(MapTask.java:370) at >> >> > > > org.apache.hadoop.mapred.Child$4.run(Child.java:255) at >> >> > > > java.security.AccessController.doPrivileged(Native Method) at >> >> > > > javax.security.auth.Subject.doAs(Subject.java:396) at >> >> > > > >> >> > > >> >> > >> >> >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) >> >> > > > at org.apache.hadoop.mapred.Child.main(Child.java:249) >> >> > > >> >> > >> >> >> > >> > >> > >
