Thanks Josh. I wondered if it was a mistake or an intentional change. My hope was mistake and that seems like a reasonable explanation as there is no specific indicator I see that it was intentional. I'll run with that assumption, open a JIRA, submit a PR, and test out the change.
On Tue, Jul 26, 2016 at 10:44 AM Josh Wills <[email protected]> wrote: > Hey Ben, > > I suspect it was just a mistake-- it looks like we were consolidating some > common patterns in the code that had different use cases (i.e., defensive > copies on reads vs. not doing that on sorts.) Do you suspect the fix is as > easy as reverting to the KeyValue constructor here? > > J > > On Mon, Jul 25, 2016 at 3:17 PM, Ben Roling <[email protected]> wrote: > >> We've got a Crunch pipeline that does an HBase bulk load that we're >> migrating from a CDH4 to a CDH5 cluster. In the process of porting the >> job, the job has gone from using crunch-0.8.4 to using crunch-0.11.0. >> We've noticed the job is dramatically slower when it runs in our CDH5 >> cluster and I've been doing some digging to try to figure out why. >> >> The first job in the pipeline spits a PCollection<KeyValue> out to HDFS >> and that job seems to behave fine. The next job uses >> HFileUtils.writeToHFilesForIncrementalLoad to read that PCollection and >> generate the HFiles for the bulk load. In the map phase of the instance of >> that job that I am debugging, there are 1168 map tasks. Nearly all of the >> map tasks complete in 2 minutes or less per task. There are 2 exceptional >> tasks. One takes 37 minutes and the other takes *11.5 hours*. The >> input and output records and bytes counters on the tasks don't quickly >> explain the problem. There are other tasks with larger input and output >> that completed in 2 minutes or less. >> >> The long running tasks were always stuck here: >> >> 2016-07-21 22:55:33,544 INFO [main] org.apache.hadoop.mapred.Merger: Down to >> the last merge-pass, with 4 segments left of total size: 72987232 bytes >> >> >> The next output after it finally finishes 11.5 hours later is: >> >> 2016-07-22 10:30:52,594 INFO [main] org.apache.hadoop.mapred.Merger: Merging >> 4 sorted segments >> >> >> I gathered some stack samples and hooked up remote debugging to try to >> figure out what was going on to make the Merger take so long. The stack >> traces always looked like this: >> >> main@1" prio=5 tid=0x1 nid=NA runnable >> java.lang.Thread.State: RUNNABLE >> at org.apache.hadoop.hbase.KeyValue.create(KeyValue.java:2420) >> at org.apache.hadoop.hbase.KeyValue.create(KeyValue.java:2401) >> at >> org.apache.crunch.io.hbase.HBaseTypes.bytesToKeyValue(HBaseTypes.java:127) >> at >> org.apache.crunch.io.hbase.HFileUtils$KeyValueComparator.compare(HFileUtils.java:257) >> at >> org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:587) >> at >> org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:141) >> at >> org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:108) >> at >> org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:524) >> at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:547) >> at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:209) >> at >> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1911) >> at >> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1507) >> at >> org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723) >> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793) >> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) >> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) >> at >> java.security.AccessController.doPrivileged(AccessController.java:-1) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) >> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) >> >> In the debugger, I can see the KeyValue.create() method is copying a 59MB >> byte array. Every time I pause the debugger, it seems to be copying that >> same 59MB byte array. I can step through the loop in Merger.writeFile() and >> see progress being made. When stepping through interactively it doesn’t >> seem slow, but I did some timing and found that it appears to be going at a >> rate of about 43 records per second. The task has 2,794,137 records to >> process, and as such that is very slow. >> >> Looking through the history of crunch, I see that >> HFileUtils$KeyValueComparator has changed between crunch-0.8.4 (used in the >> version of the job run with CDH4) and crunch-0.11 (used in the version of >> the job with CDH5). The following commit changed the behavior to invoke >> HBaseTypes.bytesToKeyValue: >> >> https://github.com/apache/crunch/commit/a959ee6c7fc400d1f455b0742641c54de1dec0bf#diff-bc76ce0b41704c9c4efbfa1aab53588d >> >> Previously, new KeyValue(byte[], int, int) was invoked, which does not do >> a byte array copy. >> >> I think this byte array copy is the cause of the dramatic slowdown. I >> suspect the order of the KeyValue pairs in the data must play a factor. My >> guess is the data is ordered in such a way where a large HBase KeyValue >> (59MB) is placed in such a way that the code must repeatedly do comparisons >> of other KeyValues to that KeyValue. Every time the comparator is invoked, >> it does this deep copy of the KeyValue and since the KeyValue is large, the >> copy is really expensive. >> >> Has anyone noticed this problem before? Is the byte array copy >> intentional and necessary? Looking at master it seems like it is still >> there in the latest code. >> > >
