Where did you need to invoke #init() ? That shouldn't be the case, except maybe for specific inputs. The processor should not need to invoke initialize or close on Inputs / Outputs. "start()" is something that was added to allow Processors to skip Inputs if required.
fwiw, if i'm responsible for any of the lifecycle of an object, should > probably be responsible for all of it. I'd agree with that, and start(), which was added later, breaks this. There's a jira open to separate framework and user components for Inputs / Outputs. I think giving the Processor full control over the lifecycle if Inputs/Outputs is something that we need to think about as well. #waitForAllInputsReady(), and the equivalent method which accepts a list of Inputs - these are helpers to allow processors to know when specific Inputs are ready. They could do something else meanwhile - block on this method in the main execution thread, or setup a notification mechanism by blocking on these methods in a separate thread. On Sat, May 17, 2014 at 10:19 AM, Chris K Wensel <[email protected]> wrote: > ah, ok. will give that a shot. > > fwiw, if i'm responsible for any of the lifecycle of an object, should > probably be responsible for all of it. > > so maybe #start() can be pulled out (though it seems unimplemented). I > also found an #init() i have to call too before i call #start(). (and now I > ignore #close(), which makes me queasy). > > then of course there is the #waitForAllInputsReady() i'm unsure about... > > ckw > > On May 17, 2014, at 2:55 AM, Siddharth Seth <[email protected]> wrote: > > It looks like close() is being invoked on the Output from within the > processor. Tez closes Inputs and Outputs after a processor completes - so > it ends up getting invoked twice, which is likely what is causing the error > - OnFileSortedOutput doesn't handle this very well. Expected usage is to > let the framework take care of closing the Inputs and Outputs. We could > likely improve this though by making sure the existing I/Os handle such > usage better - will create a jira for that if you can confirm that this is > what is happening. > > > On Fri, May 16, 2014 at 8:17 PM, Chris K Wensel <[email protected]> wrote: > >> found this.. >> >> 2014-05-16 20:07:12,237 INFO dflt.DefaultSorter >> (DefaultSorter.java:flush(584)) - Starting flush of map output >> 2014-05-16 20:07:12,237 INFO dflt.DefaultSorter >> (DefaultSorter.java:flush(602)) - Sorting & Spilling map output >> 2014-05-16 20:07:12,237 INFO dflt.DefaultSorter >> (DefaultSorter.java:flush(603)) - bufstart = 0; bufend = 210; bufvoid = >> 104857600 >> 2014-05-16 20:07:12,237 INFO dflt.DefaultSorter >> (DefaultSorter.java:flush(605)) - kvstart = 26214396(104857584); kvend = >> 26214360(104857440); length = 37/6553600 >> *2014-05-16 20:07:12,264 INFO dflt.DefaultSorter >> (DefaultSorter.java:spill(819)) - Finished spill 0* >> *2014-05-16 20:07:12,312 INFO mapred.YarnTezDagChild >> (YarnTezDagChild.java:run(582)) - Closing task, >> taskAttemptId=attempt_1400296015141_0001_1_00_000000_0* >> 2014-05-16 20:07:12,312 INFO tez.FlowProcessor >> (FlowProcessor.java:close(198)) - in close >> 2014-05-16 20:07:12,312 INFO dflt.DefaultSorter >> (DefaultSorter.java:flush(584)) - Starting flush of map output >> 2014-05-16 20:07:12,312 INFO dflt.DefaultSorter >> (DefaultSorter.java:resetSpill(345)) - (RESET) equator 0 kv >> 26214396(104857584) kvi 26214356(104857424) >> 2014-05-16 20:07:12,312 INFO dflt.DefaultSorter >> (DefaultSorter.java:flush(602)) - Sorting & Spilling map output >> 2014-05-16 20:07:12,312 INFO dflt.DefaultSorter >> (DefaultSorter.java:flush(603)) - bufstart = 0; bufend = 210; bufvoid = >> 104857600 >> 2014-05-16 20:07:12,312 INFO dflt.DefaultSorter >> (DefaultSorter.java:flush(605)) - kvstart = 26214396(104857584); kvend = >> 26214360(104857440); length = 37/6553600 >> 2014-05-16 20:07:12,314 INFO dflt.DefaultSorter >> (DefaultSorter.java:spill(819)) - Finished spill 1 >> 2014-05-16 20:07:12,319 INFO runtime.LogicalIOProcessorRuntimeTask >> (LogicalIOProcessorRuntimeTask.java:cleanup(661)) - Final Counters : >> Counters: 16 [[org.apache.tez.common.counters.TaskCounter >> SPILLED_RECORDS=20, INPUT_RECORDS_PROCESSED=0, OUTPUT_RECORDS=10, >> OUTPUT_BYTES=210, OUTPUT_BYTES_WITH_OVERHEAD=0, OUTPUT_BYTES_PHYSICAL=236, >> ADDITIONAL_SPILLS_BYTES_WRITTEN=236, ADDITIONAL_SPILLS_BYTES_READ=0, >> ADDITIONAL_SPILL_COUNT=1][cascading.flow.SliceCounters >> Process_Begin_Time=1400296031710, Process_End_Time=1400296032311, >> Read_Duration=145, Tuples_Read=10, Tuples_Written=10, >> Write_Duration=0][cascading.flow.StepCounters Tuples_Read=10]] >> 2014-05-16 20:07:12,320 ERROR security.UserGroupInformation >> (UserGroupInformation.java:doAs(1494)) - PriviledgedActionException >> as:cwensel (auth:SIMPLE) >> cause:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find >> attempt_1400296015141_0001_1_00_000000_0_10002_spill_0.out in any of the >> configured local directories >> 2014-05-16 20:07:12,320 INFO mapred.YarnTezDagChild >> (YarnTezDagChild.java:run(170)) - Heartbeat thread interrupted. stopped: >> true error: false >> 2014-05-16 20:07:12,320 INFO mapred.YarnTezDagChild >> (YarnTezDagChild.java:run(179)) - Current task marked as complete. Stopping >> heartbeat thread and allowing normal container shutdown >> 2014-05-16 20:07:12,321 FATAL mapred.YarnTezDagChild >> (YarnTezDagChild.java:main(664)) - Error running child : >> org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find >> attempt_1400296015141_0001_1_00_000000_0_10002_spill_0.out in any of the >> configured local directories >> at >> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathToRead(LocalDirAllocator.java:445) >> at >> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathToRead(LocalDirAllocator.java:164) >> at >> org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles.getSpillFile(TezTaskOutputFiles.java:168) >> at >> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.mergeParts(DefaultSorter.java:988) >> at >> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.flush(DefaultSorter.java:633) >> at >> org.apache.tez.runtime.library.output.OnFileSortedOutput.close(OnFileSortedOutput.java:124) >> at >> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.close(LogicalIOProcessorRuntimeTask.java:331) >> at >> org.apache.hadoop.mapred.YarnTezDagChild$5.run(YarnTezDagChild.java:584) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:415) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) >> at org.apache.hadoop.mapred.YarnTezDagChild.main(YarnTezDagChild.java:570) >> >> On May 16, 2014, at 5:28 PM, Siddharth Seth <[email protected]> wrote: >> >> Chris, >> Is this running multiple local-dirs in YARN, and are they on different >> disks ? This should only show up under normal operation if a disk goes bad. >> Do you see the following log line in the logs "Finished spill 0". >> Otherwise, would need access to more logs to figure out what's going on. >> Also, is this easily reproducible ? >> >> Thanks >> - Sid >> >> >> On Fri, May 16, 2014 at 4:06 PM, Chris K Wensel <[email protected]> wrote: >> >>> Hey all >>> >>> I'm sure I'll sort this out 2 min after sending this email.. >>> >>> but i'm getting the following exception on a simple scatter/gather, bits >>> below.. >>> >>> the salient piece is that its looking for: >>> >>> Could not find attempt_1400280851085_0001_1_00_000000_0_10003_spill_0.out >>> >>> but i can only find *_1.out in >>> ../usercache/user/appcache/application_.../ >>> >>> not a *_0.out. >>> >>> i'm on commit: >>> >>> * c247a3b - (HEAD, apache-github/master, apache-github/HEAD, >>> master)TEZ-1102. Abstract out connection management logic in shuffle code. >>> Contributed by Rajesh Balamohan. (3 days ago) >>> >>> >>> outputClassName = OnFileSortedOutput.class.getName(); >>> inputClassName = ShuffledMergedInputLegacy.class.getName(); >>> >>> movementType = EdgeProperty.DataMovementType.SCATTER_GATHER; >>> sourceType = EdgeProperty.DataSourceType.PERSISTED; >>> schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; >>> >>> >>> 2014-05-16 15:54:40,929 INFO [AsyncDispatcher event handler] >>> org.apache.tez.dag.history.HistoryEventHandler: >>> [HISTORY][DAG:dag_1400280851085_0001_1][Event:DAG_FINISHED]: >>> dagId=dag_1400280851085_0001_1, startTime=1400280860264, >>> finishTime=1400280880908, timeTaken=20644, status=FAILED, >>> diagnostics=Vertex failed, vertexName=BBE3E81575B143109A08968E135658C5, >>> vertexId=vertex_1400280851085_0001_1_00, diagnostics=[Task failed, >>> taskId=task_1400280851085_0001_1_00_000000, >>> diagnostics=[AttemptID:attempt_1400280851085_0001_1_00_000000_0 Info:Error: >>> org.apache.hadoop.util.DiskChecker$DiskErrorException: *Could not find >>> attempt_1400280851085_0001_1_00_000000_0_10003_spill_0.out* in any of >>> the configured local directories >>> at >>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathToRead(LocalDirAllocator.java:445) >>> at >>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathToRead(LocalDirAllocator.java:164) >>> at >>> org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles.getSpillFile(TezTaskOutputFiles.java:168) >>> at >>> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.mergeParts(DefaultSorter.java:988) >>> at >>> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.flush(DefaultSorter.java:633) >>> at >>> org.apache.tez.runtime.library.output.OnFileSortedOutput.close(OnFileSortedOutput.java:124) >>> at >>> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.close(LogicalIOProcessorRuntimeTask.java:331) >>> at >>> org.apache.hadoop.mapred.YarnTezDagChild$5.run(YarnTezDagChild.java:584) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:415) >>> at >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) >>> at >>> org.apache.hadoop.mapred.YarnTezDagChild.main(YarnTezDagChild.java:570) >>> >>> >>> -- >>> Chris K Wensel >>> [email protected] >>> http://concurrentinc.com >>> >>> >> >> -- >> Chris K Wensel >> [email protected] >> http://concurrentinc.com >> >> > > -- > Chris K Wensel > [email protected] > http://concurrentinc.com > >
