Yeah, that's exactly how it works. I'll have some time to look at this more closely later this morning. Would you mind opening a JIRA and/or letting me know which Hadoop version you're running against?
Thanks! Josh On Tue, Jun 25, 2013 at 7:46 AM, Florian Laws <[email protected]>wrote: > Hi Josh, > > thanks for the quick reply. > > With pipeline.done(), there is still no content at the intended output > path, > and things get even more wierd: > > The log output states > > 2013-06-25 16:41:12 FileOutputCommitter:173 [INFO] Saved output of > task 'attempt_local_0001_m_000000_0' to > /tmp/crunch-1483549519/p1/output > > but the directory > /tmp/crunch-1483549519 > > does not exist. It looks like this directory gets temporarily created > during the run, but gets removed again when the program finishes. > It gets kept around with run() and removed with done(). > > Best, > > Florian > > > > On Tue, Jun 25, 2013 at 4:37 PM, Josh Wills <[email protected]> wrote: > > Hey Florian, > > > > At first glance, it seems like a bug to me. I'm curious if the result is > any > > different if you swap in pipeline.done() for pipeline.run()? > > > > J > > > > > > On Tue, Jun 25, 2013 at 7:30 AM, Florian Laws <[email protected]> > > wrote: > >> > >> Hi all, > >> > >> I'm trying to write a simple Crunch job that outputs a sequence file > >> consisting of a custom Writable. > >> > >> The job runs successfully, but the output is not written to the path > >> that I specify in To.sequenceFile(), > >> but instead to a Crunch working directory. > >> > >> This happens when running the job both locally and on my 1-node Hadoop > >> test cluster, > >> and it happens both with Crunch 0.6.0 and 0.7.0-SNAPSHOT as of today > >> (38a97e5). > >> > >> Code snippet: > >> --- > >> > >> public int run(String[] args) throws IOException { > >> CommandLine cl = parseCommandLine(args); > >> Path output = new Path((String) cl.getValue(OUTPUT_OPTION)); > >> int docIdIndex = getColumnIndex(cl, "DocID"); > >> int ldaIndex = getColumnIndex(cl, "LDA"); > >> > >> Pipeline pipeline = new MRPipeline(DbDumpToSeqFile.class); > >> pipeline.setConfiguration(getConf()); > >> PCollection<String> lines = pipeline.readTextFile((String) > >> cl.getValue(INPUT_OPTION)); > >> PTable<String, NamedQuantizedVecWritable> vectors = lines.parallelDo( > >> new ConvertToSeqFileDoFn(docIdIndex, ldaIndex), > >> tableOf(strings(), writables(NamedQuantizedVecWritable.class))); > >> > >> vectors.write(To.sequenceFile(output)); > >> > >> PipelineResult res = pipeline.run(); > >> return res.succeeded() ? 0 : 1; > >> } > >> --- > >> > >> Log output from local run. > >> Note how the intended output path "/tmp/foo.seq" is reported in the > >> execution plan, > >> is not actually used. > >> --- > >> > >> 2013-06-25 16:19:44.250 java[10755:1203] Unable to load realm info > >> from SCDynamicStore > >> 2013-06-25 16:19:44 HadoopUtil:185 [INFO] Deleting /tmp/foo.seq > >> 2013-06-25 16:19:44 FileTargetImpl:224 [INFO] Will write output files > >> to new path: /tmp/foo.seq > >> 2013-06-25 16:19:45 JobClient:741 [WARN] No job jar file set. User > >> classes may not be found. See JobConf(Class) or > >> JobConf#setJar(String). > >> 2013-06-25 16:19:45 FileInputFormat:237 [INFO] Total input paths to > >> process : 1 > >> 2013-06-25 16:19:45 TrackerDistributedCacheManager:407 [INFO] Creating > >> MAP in > >> > /tmp/hadoop-florian/mapred/local/archive/4100035173370108016_-456151549_2075417214/file/tmp/crunch-1128974463/p1-work--1596891011522800122 > >> with rwxr-xr-x > >> 2013-06-25 16:19:45 TrackerDistributedCacheManager:447 [INFO] Cached > >> /tmp/crunch-1128974463/p1/MAP as > >> > >> > /tmp/hadoop-florian/mapred/local/archive/4100035173370108016_-456151549_2075417214/file/tmp/crunch-1128974463/p1/MAP > >> 2013-06-25 16:19:45 TrackerDistributedCacheManager:470 [INFO] Cached > >> /tmp/crunch-1128974463/p1/MAP as > >> > >> > /tmp/hadoop-florian/mapred/local/archive/4100035173370108016_-456151549_2075417214/file/tmp/crunch-1128974463/p1/MAP > >> > >> 2013-06-25 16:19:45 CrunchControlledJob:303 [INFO] Running job > >> "com.issuu.mahout.utils.DbDumpToSeqFile: > >> Text(/Users/florian/data/docdb.first20.txt)+S0+SeqFile(/tmp/foo.seq)" > >> > >> 2013-06-25 16:19:45 CrunchControlledJob:304 [INFO] Job status > >> available at: http://localhost:8080/ > >> 2013-06-25 16:19:45 Task:792 [INFO] Task:attempt_local_0001_m_000000_0 > >> is done. And is in the process of commiting > >> 2013-06-25 16:19:45 LocalJobRunner:321 [INFO] > >> 2013-06-25 16:19:45 Task:945 [INFO] Task attempt_local_0001_m_000000_0 > >> is allowed to commit now > >> > >> 2013-06-25 16:19:45 FileOutputCommitter:173 [INFO] Saved output of > >> task 'attempt_local_0001_m_000000_0' to > >> /tmp/crunch-1128974463/p1/output > >> > >> 2013-06-25 16:19:48 LocalJobRunner:321 [INFO] > >> 2013-06-25 16:19:48 Task:904 [INFO] Task 'attempt_local_0001_m_000000_0' > >> done. > >> > >> --- > >> > >> > >> This crude patch makes the output end up at the right place, > >> but breaks a lot of other tests. > >> --- > >> > >> --- > >> > a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java > >> +++ > >> > b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java > >> @@ -66,7 +66,7 @@ public class FileTargetImpl implements PathTarget { > >> protected void configureForMapReduce(Job job, Class keyClass, Class > >> valueClass, > >> Class outputFormatClass, Path outputPath, String name) { > >> try { > >> - FileOutputFormat.setOutputPath(job, outputPath); > >> + FileOutputFormat.setOutputPath(job, path); > >> } catch (Exception e) { > >> throw new RuntimeException(e); > >> } > >> > >> --- > >> > >> > >> Am I doing something wrong, or is this a bug? > >> > >> Best, > >> > >> Florian > > > > >
