Where did you need to invoke #init() ? That shouldn't be the case, except
maybe for specific inputs.
The processor should not need to invoke initialize or close on Inputs /
Outputs. "start()" is something that was added to allow Processors to skip
Inputs if required.

fwiw, if i'm responsible for any of the lifecycle of an object, should
> probably be responsible for all of it.

I'd agree with that, and start(), which was added later, breaks this.
There's a jira open to separate framework and user components for Inputs /
Outputs. I think giving the Processor full control over the lifecycle if
Inputs/Outputs is something that we need to think about as well.

  #waitForAllInputsReady(), and the equivalent method which accepts a list
of Inputs - these are helpers to allow processors to know when specific
Inputs are ready. They could do something else meanwhile - block on this
method in the main execution thread, or setup a notification mechanism by
blocking on these methods in a separate thread.


On Sat, May 17, 2014 at 10:19 AM, Chris K Wensel <[email protected]> wrote:

> ah, ok. will give that a shot.
>
> fwiw, if i'm responsible for any of the lifecycle of an object, should
> probably be responsible for all of it.
>
> so maybe #start() can be pulled out (though it seems unimplemented). I
> also found an #init() i have to call too before i call #start(). (and now I
> ignore #close(), which makes me queasy).
>
> then of course there is the #waitForAllInputsReady() i'm unsure about...
>
> ckw
>
> On May 17, 2014, at 2:55 AM, Siddharth Seth <[email protected]> wrote:
>
> It looks like close() is being invoked on the Output from within the
> processor. Tez closes Inputs and Outputs after a processor completes - so
> it ends up getting invoked twice, which is likely what is causing the error
> - OnFileSortedOutput doesn't handle this very well. Expected usage is to
> let the framework take care of closing the Inputs and Outputs. We could
> likely improve this though by making sure the existing I/Os handle such
> usage better - will create a jira for that if you can confirm that this is
> what is happening.
>
>
> On Fri, May 16, 2014 at 8:17 PM, Chris K Wensel <[email protected]> wrote:
>
>> found this..
>>
>> 2014-05-16 20:07:12,237 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:flush(584)) - Starting flush of map output
>> 2014-05-16 20:07:12,237 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:flush(602)) - Sorting & Spilling map output
>> 2014-05-16 20:07:12,237 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:flush(603)) - bufstart = 0; bufend = 210; bufvoid =
>> 104857600
>> 2014-05-16 20:07:12,237 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:flush(605)) - kvstart = 26214396(104857584); kvend =
>> 26214360(104857440); length = 37/6553600
>> *2014-05-16 20:07:12,264 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:spill(819)) - Finished spill 0*
>> *2014-05-16 20:07:12,312 INFO  mapred.YarnTezDagChild
>> (YarnTezDagChild.java:run(582)) - Closing task,
>> taskAttemptId=attempt_1400296015141_0001_1_00_000000_0*
>> 2014-05-16 20:07:12,312 INFO  tez.FlowProcessor
>> (FlowProcessor.java:close(198)) - in close
>> 2014-05-16 20:07:12,312 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:flush(584)) - Starting flush of map output
>> 2014-05-16 20:07:12,312 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:resetSpill(345)) - (RESET) equator 0 kv
>> 26214396(104857584) kvi 26214356(104857424)
>> 2014-05-16 20:07:12,312 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:flush(602)) - Sorting & Spilling map output
>> 2014-05-16 20:07:12,312 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:flush(603)) - bufstart = 0; bufend = 210; bufvoid =
>> 104857600
>> 2014-05-16 20:07:12,312 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:flush(605)) - kvstart = 26214396(104857584); kvend =
>> 26214360(104857440); length = 37/6553600
>> 2014-05-16 20:07:12,314 INFO  dflt.DefaultSorter
>> (DefaultSorter.java:spill(819)) - Finished spill 1
>> 2014-05-16 20:07:12,319 INFO  runtime.LogicalIOProcessorRuntimeTask
>> (LogicalIOProcessorRuntimeTask.java:cleanup(661)) - Final Counters :
>> Counters: 16 [[org.apache.tez.common.counters.TaskCounter
>> SPILLED_RECORDS=20, INPUT_RECORDS_PROCESSED=0, OUTPUT_RECORDS=10,
>> OUTPUT_BYTES=210, OUTPUT_BYTES_WITH_OVERHEAD=0, OUTPUT_BYTES_PHYSICAL=236,
>> ADDITIONAL_SPILLS_BYTES_WRITTEN=236, ADDITIONAL_SPILLS_BYTES_READ=0,
>> ADDITIONAL_SPILL_COUNT=1][cascading.flow.SliceCounters
>> Process_Begin_Time=1400296031710, Process_End_Time=1400296032311,
>> Read_Duration=145, Tuples_Read=10, Tuples_Written=10,
>> Write_Duration=0][cascading.flow.StepCounters Tuples_Read=10]]
>> 2014-05-16 20:07:12,320 ERROR security.UserGroupInformation
>> (UserGroupInformation.java:doAs(1494)) - PriviledgedActionException
>> as:cwensel (auth:SIMPLE)
>> cause:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find
>> attempt_1400296015141_0001_1_00_000000_0_10002_spill_0.out in any of the
>> configured local directories
>> 2014-05-16 20:07:12,320 INFO  mapred.YarnTezDagChild
>> (YarnTezDagChild.java:run(170)) - Heartbeat thread interrupted.  stopped:
>> true error: false
>> 2014-05-16 20:07:12,320 INFO  mapred.YarnTezDagChild
>> (YarnTezDagChild.java:run(179)) - Current task marked as complete. Stopping
>> heartbeat thread and allowing normal container shutdown
>> 2014-05-16 20:07:12,321 FATAL mapred.YarnTezDagChild
>> (YarnTezDagChild.java:main(664)) - Error running child :
>> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find
>> attempt_1400296015141_0001_1_00_000000_0_10002_spill_0.out in any of the
>> configured local directories
>>  at
>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathToRead(LocalDirAllocator.java:445)
>> at
>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathToRead(LocalDirAllocator.java:164)
>>  at
>> org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles.getSpillFile(TezTaskOutputFiles.java:168)
>> at
>> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.mergeParts(DefaultSorter.java:988)
>>  at
>> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.flush(DefaultSorter.java:633)
>> at
>> org.apache.tez.runtime.library.output.OnFileSortedOutput.close(OnFileSortedOutput.java:124)
>>  at
>> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.close(LogicalIOProcessorRuntimeTask.java:331)
>> at
>> org.apache.hadoop.mapred.YarnTezDagChild$5.run(YarnTezDagChild.java:584)
>>  at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>  at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>> at org.apache.hadoop.mapred.YarnTezDagChild.main(YarnTezDagChild.java:570)
>>
>> On May 16, 2014, at 5:28 PM, Siddharth Seth <[email protected]> wrote:
>>
>> Chris,
>> Is this running multiple local-dirs in YARN, and are they on different
>> disks ? This should only show up under normal operation if a disk goes bad.
>> Do you see the following log line in the logs "Finished spill 0".
>> Otherwise, would need access to more logs to figure out what's going on.
>> Also, is this easily reproducible ?
>>
>> Thanks
>> - Sid
>>
>>
>> On Fri, May 16, 2014 at 4:06 PM, Chris K Wensel <[email protected]> wrote:
>>
>>> Hey all
>>>
>>> I'm sure I'll sort this out 2 min after sending this email..
>>>
>>> but i'm getting the following exception on a simple scatter/gather, bits
>>> below..
>>>
>>> the salient piece is that its looking for:
>>>
>>> Could not find attempt_1400280851085_0001_1_00_000000_0_10003_spill_0.out
>>>
>>> but i can only find *_1.out in
>>> ../usercache/user/appcache/application_.../
>>>
>>> not a *_0.out.
>>>
>>> i'm on commit:
>>>
>>> * c247a3b - (HEAD, apache-github/master, apache-github/HEAD, 
>>> master)TEZ-1102. Abstract out connection management logic in shuffle code.
>>> Contributed by Rajesh Balamohan. (3 days ago)
>>>
>>>
>>>       outputClassName = OnFileSortedOutput.class.getName();
>>>       inputClassName = ShuffledMergedInputLegacy.class.getName();
>>>
>>>       movementType = EdgeProperty.DataMovementType.SCATTER_GATHER;
>>>       sourceType = EdgeProperty.DataSourceType.PERSISTED;
>>>       schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL;
>>>
>>>
>>> 2014-05-16 15:54:40,929 INFO [AsyncDispatcher event handler]
>>> org.apache.tez.dag.history.HistoryEventHandler:
>>> [HISTORY][DAG:dag_1400280851085_0001_1][Event:DAG_FINISHED]:
>>> dagId=dag_1400280851085_0001_1, startTime=1400280860264,
>>> finishTime=1400280880908, timeTaken=20644, status=FAILED,
>>> diagnostics=Vertex failed, vertexName=BBE3E81575B143109A08968E135658C5,
>>> vertexId=vertex_1400280851085_0001_1_00, diagnostics=[Task failed,
>>> taskId=task_1400280851085_0001_1_00_000000,
>>> diagnostics=[AttemptID:attempt_1400280851085_0001_1_00_000000_0 Info:Error:
>>> org.apache.hadoop.util.DiskChecker$DiskErrorException: *Could not find
>>> attempt_1400280851085_0001_1_00_000000_0_10003_spill_0.out* in any of
>>> the configured local directories
>>>  at
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathToRead(LocalDirAllocator.java:445)
>>> at
>>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathToRead(LocalDirAllocator.java:164)
>>>  at
>>> org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles.getSpillFile(TezTaskOutputFiles.java:168)
>>> at
>>> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.mergeParts(DefaultSorter.java:988)
>>>  at
>>> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.flush(DefaultSorter.java:633)
>>> at
>>> org.apache.tez.runtime.library.output.OnFileSortedOutput.close(OnFileSortedOutput.java:124)
>>>  at
>>> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.close(LogicalIOProcessorRuntimeTask.java:331)
>>> at
>>> org.apache.hadoop.mapred.YarnTezDagChild$5.run(YarnTezDagChild.java:584)
>>>  at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>>  at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
>>> at
>>> org.apache.hadoop.mapred.YarnTezDagChild.main(YarnTezDagChild.java:570)
>>>
>>>
>>>     --
>>> Chris K Wensel
>>> [email protected]
>>> http://concurrentinc.com
>>>
>>>
>>
>>     --
>> Chris K Wensel
>> [email protected]
>> http://concurrentinc.com
>>
>>
>
> --
> Chris K Wensel
> [email protected]
> http://concurrentinc.com
>
>

Reply via email to