And since I failed to answer your question directly - yes, I think the solution could be as simple as switching back to the KeyValue constructor.
On Wed, Jul 27, 2016 at 9:23 AM Ben Roling <[email protected]> wrote: > Thanks Josh. I wondered if it was a mistake or an intentional change. My > hope was mistake and that seems like a reasonable explanation as there is > no specific indicator I see that it was intentional. I'll run with that > assumption, open a JIRA, submit a PR, and test out the change. > > On Tue, Jul 26, 2016 at 10:44 AM Josh Wills <[email protected]> wrote: > >> Hey Ben, >> >> I suspect it was just a mistake-- it looks like we were consolidating >> some common patterns in the code that had different use cases (i.e., >> defensive copies on reads vs. not doing that on sorts.) Do you suspect the >> fix is as easy as reverting to the KeyValue constructor here? >> >> J >> >> On Mon, Jul 25, 2016 at 3:17 PM, Ben Roling <[email protected]> wrote: >> >>> We've got a Crunch pipeline that does an HBase bulk load that we're >>> migrating from a CDH4 to a CDH5 cluster. In the process of porting the >>> job, the job has gone from using crunch-0.8.4 to using crunch-0.11.0. >>> We've noticed the job is dramatically slower when it runs in our CDH5 >>> cluster and I've been doing some digging to try to figure out why. >>> >>> The first job in the pipeline spits a PCollection<KeyValue> out to HDFS >>> and that job seems to behave fine. The next job uses >>> HFileUtils.writeToHFilesForIncrementalLoad to read that PCollection and >>> generate the HFiles for the bulk load. In the map phase of the instance of >>> that job that I am debugging, there are 1168 map tasks. Nearly all of the >>> map tasks complete in 2 minutes or less per task. There are 2 exceptional >>> tasks. One takes 37 minutes and the other takes *11.5 hours*. The >>> input and output records and bytes counters on the tasks don't quickly >>> explain the problem. There are other tasks with larger input and output >>> that completed in 2 minutes or less. >>> >>> The long running tasks were always stuck here: >>> >>> 2016-07-21 22:55:33,544 INFO [main] org.apache.hadoop.mapred.Merger: Down >>> to the last merge-pass, with 4 segments left of total size: 72987232 bytes >>> >>> >>> The next output after it finally finishes 11.5 hours later is: >>> >>> 2016-07-22 10:30:52,594 INFO [main] org.apache.hadoop.mapred.Merger: >>> Merging 4 sorted segments >>> >>> >>> I gathered some stack samples and hooked up remote debugging to try to >>> figure out what was going on to make the Merger take so long. The stack >>> traces always looked like this: >>> >>> main@1" prio=5 tid=0x1 nid=NA runnable >>> java.lang.Thread.State: RUNNABLE >>> at org.apache.hadoop.hbase.KeyValue.create(KeyValue.java:2420) >>> at org.apache.hadoop.hbase.KeyValue.create(KeyValue.java:2401) >>> at >>> org.apache.crunch.io.hbase.HBaseTypes.bytesToKeyValue(HBaseTypes.java:127) >>> at >>> org.apache.crunch.io.hbase.HFileUtils$KeyValueComparator.compare(HFileUtils.java:257) >>> at >>> org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:587) >>> at >>> org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:141) >>> at >>> org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:108) >>> at >>> org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:524) >>> at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:547) >>> at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:209) >>> at >>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.mergeParts(MapTask.java:1911) >>> at >>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1507) >>> at >>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723) >>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:793) >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) >>> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) >>> at >>> java.security.AccessController.doPrivileged(AccessController.java:-1) >>> at javax.security.auth.Subject.doAs(Subject.java:422) >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) >>> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) >>> >>> In the debugger, I can see the KeyValue.create() method is copying a >>> 59MB byte array. Every time I pause the debugger, it seems to be copying >>> that same 59MB byte array. I can step through the loop in >>> Merger.writeFile() and see progress being made. When stepping through >>> interactively it doesn’t seem slow, but I did some timing and found that it >>> appears to be going at a rate of about 43 records per second. The task has >>> 2,794,137 records to process, and as such that is very slow. >>> >>> Looking through the history of crunch, I see that >>> HFileUtils$KeyValueComparator has changed between crunch-0.8.4 (used in the >>> version of the job run with CDH4) and crunch-0.11 (used in the version of >>> the job with CDH5). The following commit changed the behavior to invoke >>> HBaseTypes.bytesToKeyValue: >>> >>> https://github.com/apache/crunch/commit/a959ee6c7fc400d1f455b0742641c54de1dec0bf#diff-bc76ce0b41704c9c4efbfa1aab53588d >>> >>> Previously, new KeyValue(byte[], int, int) was invoked, which does not >>> do a byte array copy. >>> >>> I think this byte array copy is the cause of the dramatic slowdown. I >>> suspect the order of the KeyValue pairs in the data must play a factor. My >>> guess is the data is ordered in such a way where a large HBase KeyValue >>> (59MB) is placed in such a way that the code must repeatedly do comparisons >>> of other KeyValues to that KeyValue. Every time the comparator is invoked, >>> it does this deep copy of the KeyValue and since the KeyValue is large, the >>> copy is really expensive. >>> >>> Has anyone noticed this problem before? Is the byte array copy >>> intentional and necessary? Looking at master it seems like it is still >>> there in the latest code. >>> >> >>
