Cascading generates configurations algorithmically. I find, in general, the builder pattern makes such code incomprehensible and opaque. if memory serves, the migration to that model has been a struggle to adopt when there isn’t an alternative.
ckw > On Apr 3, 2015, at 1:16 PM, Bikas Saha <[email protected]> wrote: > > There are no open jiras about issues when writing multiple broadcast outputs. > That being said, it does not mean that your issue may not be the first J > > We have seen some scalability issues with broadcast when done for large files > across large clusters and have put in some mitigations for that. The > mitigation happens in the receivers. You may have hit some issues there. > Certainly the logs would help. > > From the code sample below, post 0.5, our recommendation has been to use > *Config helper methods to build edges so that you can automatically get > better configured edges as we make improvements under the covers. > > In the specific case below you could use > UnorderedKVEdgeConfig#createDefaultBroadcastEdgeProperty() > > Bikas > > From: Chris K Wensel [mailto:[email protected] <mailto:[email protected]>] > Sent: Friday, April 03, 2015 12:52 PM > To: [email protected] <mailto:[email protected]> > Subject: Re: BufferTooSmallException > > a quick update. > > we have been working to identify red herrings throughout the logs (one of > which is the exception in the subject). > > outside of those, we have noticed trouble around a vertex broadcasting to two > vertices. here is the edge definition (remember, there are two edges from the > source vertex) > > edgeValues.outputClassName = UnorderedKVOutput.class.getName(); > edgeValues.inputClassName = UnorderedKVInput.class.getName(); > > edgeValues.movementType = EdgeProperty.DataMovementType.BROADCAST; > edgeValues.sourceType = EdgeProperty.DataSourceType.PERSISTED; > edgeValues.schedulingType = EdgeProperty.SchedulingType.SEQUENTIAL; > > I don’t have the full logs (Cyrille may be able to follow up), but it seems > the vertices receiving the broadcast are the ones having troubles. > > they are also HashJoins, so memory concerns are being looked at (the logs > seem to be shouting something about that). > > but I wanted to double check if broadcasting to two vertices from a single > has known issues. > > that said, i’m trying to see why these plans are being created and if > Cascading can prevent/minimize/not-aggravate this issue. from a quick look, > in this context, I think there is some redundancy sneaking in that needs to > be addressed. > > ckw > > On Mar 26, 2015, at 3:17 AM, Cyrille Chépélov <[email protected] > <mailto:[email protected]>> wrote: > > Hi, > > I'm the original victim :) just sent up TEZ-2237. > > Sent as much logs as was practical up to this point; can supply on a direct > basis as much as required to nail the issue. > > To give some context: these two failing DAG are part of a meta-DAG comprised > of 20 distinct DAG, all generated through scalding-cascading (in cascading > terms, there is one Cascade with 20 Jobs. When the same cascade is run with > the traditional "hadoop" fabric instead of the experimental TEZ backend, this > results in 460 separate MR jobs). > > While the 20-legged meta-DAG monster hasn't ever completed under TEZ yet, the > progress made in the last few weeks is very encouraging, hinting at very > significant speedups compared to MR; we definitely want to help getting to > the point we can compare the outputs. > > -- Cyrille > > -------- Message transféré -------- > > Reply-To: [email protected] <mailto:[email protected]> > Subject: Re: BufferTooSmallException > From: Hitesh Shah <[email protected] <mailto:[email protected]>> > Date: March 23, 2015 at 1:11:45 PM PDT > To: [email protected] <mailto:[email protected]> > > Hi Chris, > > I don’t believe this issue has been seen before. Could you file a jira for > this with the full application logs ( obtained via bin/yarn logs -application > ) and the configuration used? > > thanks > — Hitesh > > On Mar 23, 2015, at 1:01 PM, Chris K Wensel <[email protected] > <mailto:[email protected]>> wrote: > > > Hey all > > We have a user running Scalding, on Cascading3, on Tez. This exception tends > to crop up for DAGs that hang indefinitely (this DAG has 140 vertices). > > It looks like the flag exception BufferTooSmallException isn’t being caught > and forcing the buffer to reset. Nor is the exception, when passed up to the > thread, causing the Node/DAG to fail. > > Or is this a misinterpretation. > > ckw > > > 2015-03-23 11:32:40,445 INFO [TezChild] writers.UnorderedPartitionedKVWriter: > Moving to next buffer and triggering spill > 2015-03-23 11:32:40,496 INFO [UnorderedOutSpiller > [E61683F3D94D46C2998CDC61CD112750]] writers.UnorderedPartitionedKVWriter: > Finished spill 1 > 2015-03-23 11:32:40,496 INFO [UnorderedOutSpiller > [E61683F3D94D46C2998CDC61CD112750]] writers.UnorderedPartitionedKVWriter: > Spill# 1 complete. > 2015-03-23 11:32:41,185 ERROR [TezChild] > hadoop.TupleSerialization$SerializationElementWriter: failed serializing > token: 181 with classname: scala.Tuple2 > org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter$BufferTooSmallException > at > org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter$ByteArrayOutputStream.write(UnorderedPartitionedKVWriter.java:651) > at > org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter$ByteArrayOutputStream.write(UnorderedPartitionedKVWriter.java:646) > at java.io.DataOutputStream.write(DataOutputStream.java:88) > at java.io.DataOutputStream.writeInt(DataOutputStream.java:198) > at > com.twitter.chill.hadoop.KryoSerializer.serialize(KryoSerializer.java:50) > at > cascading.tuple.hadoop.TupleSerialization$SerializationElementWriter.write(TupleSerialization.java:705) > at > cascading.tuple.io.TupleOutputStream.writeElement(TupleOutputStream.java:114) > at > cascading.tuple.io.TupleOutputStream.write(TupleOutputStream.java:89) > at > cascading.tuple.io.TupleOutputStream.writeTuple(TupleOutputStream.java:64) > at > cascading.tuple.hadoop.io.TupleSerializer.serialize(TupleSerializer.java:37) > at > cascading.tuple.hadoop.io.TupleSerializer.serialize(TupleSerializer.java:28) > at > org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.write(UnorderedPartitionedKVWriter.java:212) > at > org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter.write(UnorderedPartitionedKVWriter.java:194) > at > cascading.flow.tez.stream.element.OldOutputCollector.collect(OldOutputCollector.java:51) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > […] > > — > Chris K Wensel > [email protected] <mailto:[email protected]> > > > > > > — > Chris K Wensel > [email protected] <mailto:[email protected]> > > > > > > > > — > Chris K Wensel > [email protected] <mailto:[email protected]> — Chris K Wensel [email protected]
