[
https://issues.apache.org/jira/browse/MAPREDUCE-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14121306#comment-14121306
]
Sean Zhong commented on MAPREDUCE-6067:
---------------------------------------
Hi Binglin,
Thanks for your patch. I have some doubt about your latest patch:
{quote}
void MCollectorOutputHandler::handleInput(ByteBuffer & in) {
char * buff = in.current();
uint32_t length = in.remain();
const char * end = buff + length;
char * pos = buff;
if (_kvContainer.remain() > 0) {
uint32_t filledLength = _kvContainer.fill(pos, length);
pos += filledLength;
}
while (end - pos > 0) {
KVBufferWithParititionId * kvBuffer = (KVBufferWithParititionId *)pos;
if (unlikely(end - pos < KVBuffer::headerLength())) {
THROW_EXCEPTION(IOException, "k/v meta information incomplete");
}
if (_endium == LARGE_ENDIUM) {
kvBuffer->partitionId = bswap(kvBuffer->partitionId);
kvBuffer->buffer.keyLength = bswap(kvBuffer->buffer.keyLength);
kvBuffer->buffer.valueLength = bswap(kvBuffer->buffer.valueLength);
}
uint32_t kvLength = kvBuffer->buffer.length();
KVBuffer * dest = allocateKVBuffer(kvBuffer->partitionId, kvLength);
_kvContainer.wrap((char *)dest, kvLength);
pos += 4; //skip the partition length
uint32_t filledLength = _kvContainer.fill(pos, end - pos);
pos += filledLength;
+ _mapOutputRecords->increase();
+ uint32_t outputSize = kvLength-KVBuffer::headerLength();
+ _mapOutputBytes->increase(outputSize);
}
}
{quote}
Since the new added line lies in the critical path of performance. May be it is
risky to change here?
{quote}
+ Counter * _mapOutputRecords;
+ Counter * _mapOutputBytes;
{quote}
For these two, they are not inited in the constructor. It will trigger compile
warnings, which we put great efforts to solve at MAPREDUCE-5977. There are lots
of similar issues in other files. Make sure all newed added field are inited in
constructor with the right order of field definition, otherwise there will be
GCC warnings.
{quote}
- const uint64_t M = 1000000; //million
- LOG("[MapOutputCollector::final_merge_and_spill] Spilling file path: %s",
filepath.c_str());
{quote}
Log is removed due to it is too noisy? The log was added after real pain and
practices in troubleshootings, it give out important informanation when
spliting. I also noticed some other log messages are removed, do we have enough
reason for this?
{quote}
- } else {
- LOG("MemoryPool is full, fail to allocate new MemBlock, block size:
%d, kv length: %d", expect, kvLength);
{quote}
I can understand why you remove this two lines. But these two lines also helped
when troubleshooting a realbug, where the KV length memory is coruppted, that
the LOG will print a huge kvLength information, that really helpped in
troubleshooting.
{quote}
+ assertEquals(reason, true, compareRet);
+ ResultVerifier.verifyCounters(normaljob, nativejob);
+ }
fs.close();
{quote}
I am oK with the change as long as all regression test passes.
in KVTest.java
{quote}
}
+ Job normalJob;
+ Job nativeJob;
+
@Test
public void testKVCompability() throws Exception {
{quote}
Can we make normalJob and nativeJob local var instead of field member? Since it
is a test file, Test case should share nothing except immutable things defined
in test setup.
{quote}
- if(compareRet){
- final FileSystem fs = FileSystem.get(hadoopkvtestconf);
- fs.delete(new Path(nativeoutput), true);
- fs.delete(new Path(normaloutput), true);
- fs.delete(new Path(input), true);
- fs.close();
- }
{quote}
By deleting the cleanup code, have you confirmed that it will leak any garbage
file on local disk?
{quote}
@AfterClass
@@ -150,6 +148,7 @@ private String runNativeTest(String jobname, Class<?>
keyclass, Class<?> valuecl
nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
final KVJob keyJob = new KVJob(jobname, nativekvtestconf, keyclass,
valueclass, inputpath, outputpath);
assertTrue("job should complete successfully", keyJob.runJob());
+ nativeJob = keyJob.job;
return outputpath;
}
{quote}
It is confusing by looking at this line of change.
{quote}
+ Counters normalCounters = normalJob.getCounters();
+ Counters nativeCounters = nativeJob.getCounters();
+ assertEquals(
+ normalCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(),
+ nativeCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue());
+ assertEquals(
+ normalCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue(),
+
nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue());
+ assertEquals(
+
normalCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue(),
+
nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue());
{quote}
Maybe we can add some message in assert, so that when it is not equal, we know
which counter is not equal. Like this:
assertEquals(msg, a, b)
This also applies to other new added asserts in the patch.
{quote}
+ Counter * materializedBytes =
NativeObjectFactory::GetCounter(TaskCounters::TASK_COUNTER_GROUP,
+ TaskCounters::MAP_OUTPUT_MATERIALIZED_BYTES);
{quote}
I cannot find you use this counter anywhere.
Thanks.
Basically I think the patch is good. Since we are getting close to merge, may
be should be conservative on making less important changes like log messages,
and be careful when changing the logics in performance critical path.
> native-task: spilled records counter is incorrect
> -------------------------------------------------
>
> Key: MAPREDUCE-6067
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-6067
> Project: Hadoop Map/Reduce
> Issue Type: Sub-task
> Components: task
> Reporter: Todd Lipcon
> Assignee: Binglin Chang
> Attachments: MAPREDUCE-6067.v1.patch, MAPREDUCE-6067.v2.patch,
> MAPREDUCE-6067.v3.patch, MAPREDUCE-6067.v4.patch, native-counters.html,
> trunk-counters.html
>
>
> After running a terasort, I see the spilled records counter at 5028651606,
> which is about half what I expected to see. Using the non-native collector I
> see the expected count of 10000000000. It seems the correct number of records
> were indeed spilled, because the job's output record count is correct.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)