JIRA logged: https://issues.apache.org/jira/browse/CRUNCH-614
On Wed, Jul 27, 2016 at 9:25 AM Ben Roling <[email protected]> wrote: > And since I failed to answer your question directly - yes, I think the > solution could be as simple as switching back to the KeyValue constructor. > > On Wed, Jul 27, 2016 at 9:23 AM Ben Roling <[email protected]> wrote: > >> Thanks Josh. I wondered if it was a mistake or an intentional change. >> My hope was mistake and that seems like a reasonable explanation as there >> is no specific indicator I see that it was intentional. I'll run with that >> assumption, open a JIRA, submit a PR, and test out the change. >> >> On Tue, Jul 26, 2016 at 10:44 AM Josh Wills <[email protected]> wrote: >> >>> Hey Ben, >>> >>> I suspect it was just a mistake-- it looks like we were consolidating >>> some common patterns in the code that had different use cases (i.e., >>> defensive copies on reads vs. not doing that on sorts.) Do you suspect the >>> fix is as easy as reverting to the KeyValue constructor here? >>> >>> J >>> >>> On Mon, Jul 25, 2016 at 3:17 PM, Ben Roling <[email protected]> >>> wrote: >>> >>>> We've got a Crunch pipeline that does an HBase bulk load that we're >>>> migrating from a CDH4 to a CDH5 cluster. In the process of porting the >>>> job, the job has gone from using crunch-0.8.4 to using crunch-0.11.0. >>>> We've noticed the job is dramatically slower when it runs in our CDH5 >>>> cluster and I've been doing some digging to try to figure out why. >>>> >>>> The first job in the pipeline spits a PCollection<KeyValue> out to HDFS >>>> and that job seems to behave fine. The next job uses >>>> HFileUtils.writeToHFilesForIncrementalLoad to read that PCollection and >>>> generate the HFiles for the bulk load. In the map phase of the instance of >>>> that job that I am debugging, there are 1168 map tasks. Nearly all of the >>>> map tasks complete in 2 minutes or less per task. There are 2 exceptional >>>> tasks. One takes 37 minutes and the other takes *11.5 hours*. The >>>> input and output records and bytes counters on the tasks don't quickly >>>> explain the problem. There are other tasks with larger input and output >>>> that completed in 2 minutes or less. >>>> >>>> The long running tasks were always stuck here: >>>> >>>> 2016-07-21 22:55:33,544 INFO [main] org.apache.hadoop.mapred.Merger: Down >>>> to the last merge-pass, with 4 segments left of total size: 72987232 bytes >>>> >>>> >>>> The next output after it finally finishes 11.5 hours later is: >>>> >>>> 2016-07-22 10:30:52,594 INFO [main] org.apache.hadoop.mapred.Merger: >>>> Merging 4 sorted segments >>>> >>>> >>>> I gathered some stack samples and hooked up remote debugging to try to >>>> figure out what was going on to make the Merger take so long. The stack >>>> traces always looked like this: >>>> >>>> main@1" prio=5 tid=0x1 nid=NA runnable >>>> java.lang.Thread.State: RUNNABLE >>>> at org.apache.hadoop.hbase.KeyValue.create(KeyValue.java:2420) >>>> at org.apache.hadoop.hbase.KeyValue.create(KeyValue.java:2401) >>>> at >>>> org.apache.crunch.io.hbase.HBaseTypes.bytesToKeyValue(HBaseTypes.java:127) >>>> at >>>> org.apache.crunch.io.hbase.HFileUtils$KeyValueComparator.compare(HFileUtils.java:257) >>>> at >>>> org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:587) >>>> at >>>> org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:141) >>>> at >>>> org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:108) >>>> at >>>> org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:524) >>>> at >>>> org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:547) >>>> at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:209) >>>> at >>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1911) >>>> at >>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1507) >>>> at >>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723) >>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793) >>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) >>>> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) >>>> at >>>> java.security.AccessController.doPrivileged(AccessController.java:-1) >>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>> at >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) >>>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) >>>> >>>> In the debugger, I can see the KeyValue.create() method is copying a >>>> 59MB byte array. Every time I pause the debugger, it seems to be copying >>>> that same 59MB byte array. I can step through the loop in >>>> Merger.writeFile() and see progress being made. When stepping through >>>> interactively it doesn’t seem slow, but I did some timing and found that it >>>> appears to be going at a rate of about 43 records per second. The task has >>>> 2,794,137 records to process, and as such that is very slow. >>>> >>>> Looking through the history of crunch, I see that >>>> HFileUtils$KeyValueComparator has changed between crunch-0.8.4 (used in the >>>> version of the job run with CDH4) and crunch-0.11 (used in the version of >>>> the job with CDH5). The following commit changed the behavior to invoke >>>> HBaseTypes.bytesToKeyValue: >>>> >>>> https://github.com/apache/crunch/commit/a959ee6c7fc400d1f455b0742641c54de1dec0bf#diff-bc76ce0b41704c9c4efbfa1aab53588d >>>> >>>> Previously, new KeyValue(byte[], int, int) was invoked, which does not >>>> do a byte array copy. >>>> >>>> I think this byte array copy is the cause of the dramatic slowdown. I >>>> suspect the order of the KeyValue pairs in the data must play a factor. My >>>> guess is the data is ordered in such a way where a large HBase KeyValue >>>> (59MB) is placed in such a way that the code must repeatedly do comparisons >>>> of other KeyValues to that KeyValue. Every time the comparator is invoked, >>>> it does this deep copy of the KeyValue and since the KeyValue is large, the >>>> copy is really expensive. >>>> >>>> Has anyone noticed this problem before? Is the byte array copy >>>> intentional and necessary? Looking at master it seems like it is still >>>> there in the latest code. >>>> >>> >>>
