More code, as requested! Hope this sheds some light on what I may be doing 
wrong…
Thanks Jeff and Everett (and others) for all the help!

CODE

logger.info("Generating Map-Reduce Pipeline...");
Pipeline pipeline = new MRPipeline(MyClass.class, "Crunch Pipeline", 
crunchConf);

logger.info("Establishing OrcFile Target for Later Output...");
OrcFileTarget target = new OrcFileTarget(new Path(outputPath));
//</editor-fold>

// =======================================================================
// * INGEST DATA
// * Ingest a text file, and for every row in it, create a Java object.
// =======================================================================

//<editor-fold desc="== Ingestion, Object Creation ==">
logger.info("Reading file (" + inputPath + ") into PCollection...");
PCollection<String> my_data = pipeline.readTextFile(inputPath);
logger.info("# of lines ingested from (" + inputPath + "): " + 
my_data.length().getValue());

logger.info("Converting Data to 'RecordStageOne' objects...");
PCollection<RecordStageOne> records = 
my_data.parallelDo(DoFn_CreateJavaRecords(), 
Avros.records(RecordStageOne.class));
logger.info("Records created: " + records.length().getValue() + " out of " + 
my_data.length().getValue() + " rows in file.");

//</editor-fold>

// =======================================================================
// * APPLY INTERVAL CALCULATIONS TO CURRENT RECORD SET
// * For every record, Calculate and Apply Affected Intervals
// =======================================================================

//<editor-fold desc="== Apply Intervals ==">
logger.info("Determining and Applying Working Intervals on data...");
PCollection<RecordStageTwo> records_with_intervals = 
records.parallelDo(DoFn_ApplyIntervals(time_intervals),
        Avros.records(RecordStageTwo.class));

logger.info("Records successfully processed in Apply Intervals step: " + 
records_with_intervals.length().getValue() + "/" + records.length().getValue());

pipeline.run();
//</editor-fold>

//It’s at this point that Crunch Planner combines the rest of the parallelDo 
calls into a single job, and I think overwhelms the single worker the job runs 
on..

// =======================================================================
// * CALCULATE CONTRIBUTION RESULTS (THE 3 COUNTS)
// * Report every record that matches a primary key and count its values.
// =======================================================================

//<editor-fold desc="== Calculate Count Contributions ==">
logger.info("Calculating contributions to clock in count, employee count, and 
hours worked...");
PTable<String, Tuple3<Integer, Integer, Integer>> calculatedSet = 
records_with_intervals.parallelDo(DoFn_CalculateResults(time_intervals),
        tableOf(strings(), triples(ints(), ints(), ints())));


pipeline.run();

//</editor-fold>

// =======================================================================
// * GROUP AND AGGREGATE
// * Group records by their key and sum their values (the 3 counts)
// =======================================================================

//<editor-fold desc="== Group and Aggregate ==">
logger.info("Grouping Records By Key and Aggregating Their New Values...");
PTable<String, Tuple3<Integer, Integer, Integer>> reducer = calculatedSet
        .groupByKey()
        .combineValues(Aggregators.tripAggregator(
                Aggregators.SUM_INTS(),
                Aggregators.SUM_INTS(),
                Aggregators.SUM_INTS()));


pipeline.run();

//</editor-fold>

// =======================================================================
// * PRODUCE FINAL RECORDS IN ORC FORMAT
// * Convert final records to ORC format.
// =======================================================================

//<editor-fold desc="== Convert Data to ORC Records ==">
logger.info("Producing Final ORC-Format Records...");
PCollection<TupleN> finalDataOrc = 
reducer.parallelDo(DoFn_ProduceFinalRecords(), Orcs.tuples(
        Writables.ints(),
        Writables.ints(),
        Writables.strings(),
        Writables.strings(),
        Writables.strings(),
        Writables.ints(),
        Writables.ints(),
        Writables.ints(),
        Writables.ints(),
        Writables.strings()
));

//</editor-fold>

// =======================================================================
// * WRITE TO ORC FILE AND EXECUTE PIPELINE
// * Report results in final .orc file.
// =======================================================================

//<editor-fold desc="== ORC File Output, Pipeline Closure, and Exit ==">
logger.info("Writing ORC file(s) to " + outputPath);
pipeline.write(finalDataOrc, target, Target.WriteMode.OVERWRITE);

PipelineResult result = pipeline.done();
logger.info("Pipeline ending.");

END OF CODE

The issue still: Crunch combines all the jobs halfway through the code, even 
with calls to pipeline.run() and setting max jobs to 1.
Help/Guidance appreciated!
---------------------------------------------------------------------------
Landon Robinson
Big Data/Hadoop Engineer
---------------------------------------------------------------------------

From: Jeff Quinn <[email protected]<mailto:[email protected]>>
Date: Tuesday, November 24, 2015 at 7:59 PM
To: LCI <[email protected]<mailto:[email protected]>>
Cc: Apache Crunch Mailing List 
<[email protected]<mailto:[email protected]>>
Subject: Re: Crunch Planner Hint to Not Combine Tasks

For the MRExecutor, each materialized PCollection is backed by a Path, if 
something deletes that Path you are left with this error.

So it looks like the crunch temporary directory was cleaned up by the time you 
called #write. Can you post more code? Is there a call to Pipeline#done 
somewhere before this (that is the only call that cleans up crunch tmp as far 
as a I know)

On Tue, Nov 24, 2015 at 4:54 PM, Robinson, Landon - Landon 
<[email protected]<mailto:[email protected]>> wrote:
Jeff/Everett,
Thanks so much! Though I am slightly confused... I implemented what I believe 
you were going for, after a Dofn is processed I used the write command everett 
mentioned followed by a call to pipeline.run, but received this issue:

2015-11-24 19:52:31,346 ERROR [main] 
org.apache.crunch.materialize.MaterializableIterable: Could not materialize: 
SeqFile(/tmp/crunch-245883570/p1)
java.io.IOException: No files found to materialize at: /tmp/crunch-245883570/p1

Any ideas?
---------------------------------------------------------------------------
Landon Robinson
Big Data/Hadoop Engineer
---------------------------------------------------------------------------

From: Jeff Quinn <[email protected]<mailto:[email protected]>>
Reply-To: Apache Crunch Mailing List 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, November 24, 2015 at 1:36 PM
To: LCI <[email protected]<mailto:[email protected]>>
Cc: Apache Crunch Mailing List 
<[email protected]<mailto:[email protected]>>

Subject: Re: Crunch Planner Hint to Not Combine Tasks

Hey Landon,

Happy to help!

As Everett said you can even skip the "Ingest that file into a new Pcollection" 
step, as long as you write to a SequenceFile target, this will be implicit.

As for #run vs #done, what we do is call #run in between each segment of DoFns 
and then #done once finally at the end. #run and #done do basically the same 
thing (#done calls #run), except #done cleans up the crunch temporary 
directories after calling #run.




On Tue, Nov 24, 2015 at 10:00 AM, Robinson, Landon - Landon 
<[email protected]<mailto:[email protected]>> wrote:
Apologies, pipeline.execute should be pipeline.done
---------------------------------------------------------------------------
[cid:FC34F77D-D315-4325-B8B1-27D04C7003A9]
Landon Robinson
Big Data/Hadoop Engineer
Lowe’s Companies Inc. | IT Business Intelligence
---------------------------------------------------------------------------

From: <Robinson>, LCI 
<[email protected]<mailto:[email protected]>>
Reply-To: Apache Crunch Mailing List 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, November 24, 2015 at 12:55 PM
To: Apache Crunch Mailing List 
<[email protected]<mailto:[email protected]>>, Jeff Quinn 
<[email protected]<mailto:[email protected]>>

Subject: Re: Crunch Planner Hint to Not Combine Tasks

Jeff,

Thanks for that awesome set of tips. Building on your solution, here’s some 
information about ours:
Our program:

  *   Starts the MR pipeline
  *   Does some DoFn ParallelDos
  *   Calls pipeline.execute

We never leveraged pipeline.run(). Going with your suggestion, would my course 
of action be:

  *   Start the MR pipeline
  *   Do some of the dofn parallelDos
  *   Call pipeline.write
  *   Call pipeline.run
  *   Ingest that file into a new Pcollection
  *   Call another (or more) Dofns
  *   Call pipeline.execute

Does that seem in line with your recommendation, Jeff? Let me know if my logic 
needs adjusting…
- Landon
---------------------------------------------------------------------------
Landon Robinson
Big Data/Hadoop Engineer
---------------------------------------------------------------------------

From: Jeff Quinn <[email protected]<mailto:[email protected]>>
Reply-To: Apache Crunch Mailing List 
<[email protected]<mailto:[email protected]>>
Date: Tuesday, November 24, 2015 at 12:00 PM
To: Apache Crunch Mailing List 
<[email protected]<mailto:[email protected]>>
Subject: Re: Crunch Planner Hint to Not Combine Tasks

Hey Landon,

Our team has dealt with this exact problem before. Our solution was to call 
PCollection#write after each DoFn or string of DoFns then call Pipeline#run, 
which will do an iteration of crunch planning / job submission, then use that 
same PCollection object for the next round of DoFns, call #write, and then 
#call run again, etc.

#parallelDo -> #write -> #run -> repeat

The importance of calling #write is that it the crunch planner will not 
actually do any work unless it has seen that there is at least one 
materialization of the data. This technique allows you to guarantee your DoFns 
are segmented into different MR jobs, as DoFns cannot be combined if they are 
already completed.

Hope this helps,

Jeff

On Tue, Nov 24, 2015 at 6:14 AM, Ron Hashimshony 
<[email protected]<mailto:[email protected]>> wrote:
Try set mapreduce.input.fileinputformat.split.minsize & 
mapreduce.input.fileinputformat.split.maxsize to a lower number from the 
default (usually 64 MB).
If you know of a specific DoFn in which this is required, better put it there 
in its configure function.

On Tue, Nov 24, 2015 at 3:28 PM Robinson, Landon - Landon 
<[email protected]<mailto:[email protected]>> wrote:
Hi all,

I have a Crunch job that tries to combine the last four tasks of my program 
into one M/R job.
That’s normally not a problem, but my data starts small and grows exponentially 
in the most major of those DoFn tasks, resulting in spills to disk (local, not 
HDFS).

I’ve already:

  *   Implemented scaleFactor on the DoFn where the data will emit back more 
records than it consumed, which is 40.0f
  *   Set io.sort.mb parameter to cluster setting, which is 1792
  *   Implemented map-side compression with snappy

Data set I’m ingesting is from a previous map-reduce job, which comes out to 19 
files of 10mb size (which in Crunch comes to 2 splits).
Help?
---------------------------------------------------------------------------
Landon Robinson
Big Data/Hadoop Engineer
---------------------------------------------------------------------------
NOTICE: All information in and attached to the e-mails below may be 
proprietary, confidential, privileged and otherwise protected from improper or 
erroneous disclosure. If you are not the sender's intended recipient, you are 
not authorized to intercept, read, print, retain, copy, forward, or disseminate 
this message. If you have erroneously received this communication, please 
notify the sender immediately by phone (704-758-1000<tel:%28704-758-1000>) or 
by e-mail and destroy all copies of this message electronic, paper, or 
otherwise.

By transmitting documents via this email: Users, Customers, Suppliers and 
Vendors collectively acknowledge and agree the transmittal of information via 
email is voluntary, is offered as a convenience, and is not a secured method of 
communication; Not to transmit any payment information E.G. credit card, debit 
card, checking account, wire transfer information, passwords, or sensitive and 
personal information E.G. Driver's license, DOB, social security, or any other 
information the user wishes to remain confidential; To transmit only 
non-confidential information such as plans, pictures and drawings and to assume 
all risk and liability for and indemnify Lowe's from any claims, losses or 
damages that may arise from the transmittal of documents or including 
non-confidential information in the body of an email transmittal. Thank you.


DISCLAIMER: The contents of this email, including any attachments, may contain 
information that is confidential, proprietary in nature, protected health 
information (PHI), or otherwise protected by law from disclosure, and is solely 
for the use of the intended recipient(s). If you are not the intended 
recipient, you are hereby notified that any use, disclosure or copying of this 
email, including any attachments, is unauthorized and strictly prohibited. If 
you have received this email in error, please notify the sender of this email. 
Please delete this and all copies of this email from your system. Any opinions 
either expressed or implied in this email and all attachments, are those of its 
author only, and do not necessarily reflect those of Nuna Health, Inc.
NOTICE: All information in and attached to the e-mails below may be 
proprietary, confidential, privileged and otherwise protected from improper or 
erroneous disclosure. If you are not the sender's intended recipient, you are 
not authorized to intercept, read, print, retain, copy, forward, or disseminate 
this message. If you have erroneously received this communication, please 
notify the sender immediately by phone (704-758-1000<tel:%28704-758-1000>) or 
by e-mail and destroy all copies of this message electronic, paper, or 
otherwise.

By transmitting documents via this email: Users, Customers, Suppliers and 
Vendors collectively acknowledge and agree the transmittal of information via 
email is voluntary, is offered as a convenience, and is not a secured method of 
communication; Not to transmit any payment information E.G. credit card, debit 
card, checking account, wire transfer information, passwords, or sensitive and 
personal information E.G. Driver's license, DOB, social security, or any other 
information the user wishes to remain confidential; To transmit only 
non-confidential information such as plans, pictures and drawings and to assume 
all risk and liability for and indemnify Lowe's from any claims, losses or 
damages that may arise from the transmittal of documents or including 
non-confidential information in the body of an email transmittal. Thank you.
NOTICE: All information in and attached to the e-mails below may be 
proprietary, confidential, privileged and otherwise protected from improper or 
erroneous disclosure. If you are not the sender's intended recipient, you are 
not authorized to intercept, read, print, retain, copy, forward, or disseminate 
this message. If you have erroneously received this communication, please 
notify the sender immediately by phone (704-758-1000<tel:%28704-758-1000>) or 
by e-mail and destroy all copies of this message electronic, paper, or 
otherwise.

By transmitting documents via this email: Users, Customers, Suppliers and 
Vendors collectively acknowledge and agree the transmittal of information via 
email is voluntary, is offered as a convenience, and is not a secured method of 
communication; Not to transmit any payment information E.G. credit card, debit 
card, checking account, wire transfer information, passwords, or sensitive and 
personal information E.G. Driver's license, DOB, social security, or any other 
information the user wishes to remain confidential; To transmit only 
non-confidential information such as plans, pictures and drawings and to assume 
all risk and liability for and indemnify Lowe's from any claims, losses or 
damages that may arise from the transmittal of documents or including 
non-confidential information in the body of an email transmittal. Thank you.


DISCLAIMER: The contents of this email, including any attachments, may contain 
information that is confidential, proprietary in nature, protected health 
information (PHI), or otherwise protected by law from disclosure, and is solely 
for the use of the intended recipient(s). If you are not the intended 
recipient, you are hereby notified that any use, disclosure or copying of this 
email, including any attachments, is unauthorized and strictly prohibited. If 
you have received this email in error, please notify the sender of this email. 
Please delete this and all copies of this email from your system. Any opinions 
either expressed or implied in this email and all attachments, are those of its 
author only, and do not necessarily reflect those of Nuna Health, Inc.
NOTICE: All information in and attached to the e-mails below may be 
proprietary, confidential, privileged and otherwise protected from improper or 
erroneous disclosure. If you are not the sender's intended recipient, you are 
not authorized to intercept, read, print, retain, copy, forward, or disseminate 
this message. If you have erroneously received this communication, please 
notify the sender immediately by phone (704-758-1000<tel:%28704-758-1000>) or 
by e-mail and destroy all copies of this message electronic, paper, or 
otherwise.

By transmitting documents via this email: Users, Customers, Suppliers and 
Vendors collectively acknowledge and agree the transmittal of information via 
email is voluntary, is offered as a convenience, and is not a secured method of 
communication; Not to transmit any payment information E.G. credit card, debit 
card, checking account, wire transfer information, passwords, or sensitive and 
personal information E.G. Driver's license, DOB, social security, or any other 
information the user wishes to remain confidential; To transmit only 
non-confidential information such as plans, pictures and drawings and to assume 
all risk and liability for and indemnify Lowe's from any claims, losses or 
damages that may arise from the transmittal of documents or including 
non-confidential information in the body of an email transmittal. Thank you.


DISCLAIMER: The contents of this email, including any attachments, may contain 
information that is confidential, proprietary in nature, protected health 
information (PHI), or otherwise protected by law from disclosure, and is solely 
for the use of the intended recipient(s). If you are not the intended 
recipient, you are hereby notified that any use, disclosure or copying of this 
email, including any attachments, is unauthorized and strictly prohibited. If 
you have received this email in error, please notify the sender of this email. 
Please delete this and all copies of this email from your system. Any opinions 
either expressed or implied in this email and all attachments, are those of its 
author only, and do not necessarily reflect those of Nuna Health, Inc.

NOTICE: All information in and attached to the e-mails below may be 
proprietary, confidential, privileged and otherwise protected from improper or 
erroneous disclosure. If you are not the sender's intended recipient, you are 
not authorized to intercept, read, print, retain, copy, forward, or disseminate 
this message. If you have erroneously received this communication, please 
notify the sender immediately by phone (704-758-1000) or by e-mail and destroy 
all copies of this message electronic, paper, or otherwise.

By transmitting documents via this email: Users, Customers, Suppliers and 
Vendors collectively acknowledge and agree the transmittal of information via 
email is voluntary, is offered as a convenience, and is not a secured method of 
communication; Not to transmit any payment information E.G. credit card, debit 
card, checking account, wire transfer information, passwords, or sensitive and 
personal information E.G. Driver's license, DOB, social security, or any other 
information the user wishes to remain confidential; To transmit only 
non-confidential information such as plans, pictures and drawings and to assume 
all risk and liability for and indemnify Lowe's from any claims, losses or 
damages that may arise from the transmittal of documents or including 
non-confidential information in the body of an email transmittal. Thank you.

Reply via email to