Awesome, that works, Thanks Sid! Johannes
On 06 Aug 2014, at 19:21, Siddharth Seth <[email protected]> wrote: > Johannes, > You need to be using the confMap available on the setComparator API to make > it visible for the comparator. > > > On Wed, Aug 6, 2014 at 5:54 AM, Johannes Zillmann <[email protected]> > wrote: > Hey Sid, > > that was fast. Unluckily that doesn’t solve the problem. > Passing in the custom property via partitionConfMap makes it available at the > edgeInput, but not at the edgeOutput. > Job fails at: > at myPartitionerClassName.setConf(TezRecordComparator.java:39) > at > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) > at > org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) > at > org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135) > at > org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808) > at > org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465) > at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.cleanupMerger(Shuffle.java:413) > at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.cleanupIgnoreErrors(Shuffle.java:428) > at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.access$1900(Shuffle.java:75) > at > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$ShuffleRunnerFutureCallback.onFailure(Shuffle.java:474) > at com.google.common.util.concurrent.Futures$6.run(Futures.java:977) > > Johannes > > > On 06 Aug 2014, at 09:08, Siddharth Seth <[email protected]> wrote: > > > TEZ-1379 went in. You should be able to use this properly now. > > > > > > On Tue, Aug 5, 2014 at 11:27 PM, Johannes Zillmann > > <[email protected]> wrote: > > Hey Sid, > > On 05 Aug 2014, at 21:05, Siddharth Seth <[email protected]> wrote: > > > > > The last configuration parameter to " > > > OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, > > > valueClassName, myPartitionerClassName, jobConfForShuffleSort);" is the > > > configuration for the partitioner itself. That's only used in the Output > > > - and hence is not available in the consuming Input. > > > > > > It looks like we're missing the option to set a Configuration for the > > > comparator. There's a couple of other changes required in the > > > EdgeConfigurers - I'll create a jira and post a patch later today. > > Cool, thanks! > > > > > > > > One of the big reasons to separate out the Configurations is to limit the > > > size of the payload generated. Using a generic conf (which usually ends > > > up inheriting from JobConf etc) ends up setting a large number of keys > > > (1000+ in cases), off which very few are actually used. > > > setFromConfiguration(...) actually strips out unused keys. The > > > partitionerConf parameter is meant to be a very specific Configuration > > > only for the Partitioner (should only contain the limited set of keys > > > required to run the partitioner). Similarly for the Comparator conf - > > > once it is added. Tez has no way of knowing what a valid set of keys for > > > the partitioner, comparator and combiner are - since these are all user > > > specified classes. > > > > ++++1 yeah, basically i like moving away from configuration! > > Just this time it hit me a bit ;) > > > > > > > > Till I can get a patch going for this, your usage model to get this > > > working is likely the only one which will work. > > > > Ok will do! > > Johannes > > > > > > > > > > > On Tue, Aug 5, 2014 at 8:23 AM, Johannes Zillmann > > > <[email protected]> wrote: > > > Hey guys, > > > > > > i just upgraded my application to the most current master code of Tez. > > > Run into a problem with setting up my custom key comparator. > > > It implements org.apache.hadoop.conf.Configurable and expects a custom > > > property in the passed in configuration. > > > > > > So initially i tried: > > > JobConf jobConfForShuffleSort = new JobConf(); > > > jobConfForShuffleSort.set(“myCustomProperty”,”value”) > > > Builder edgeConfBuilder = > > > OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, > > > valueClassName, myPartitionerClassName, jobConfForShuffleSort); > > > > > > But the property does not come through to the instance of > > > ‘myPartitionerClassName’. > > > Basically i see the comparator instantiated 2 times: > > > > > > (1) Here the custom property is available: > > > java.lang.Exception > > > at myPartitionerClassName.setConf(TezRecordComparator.java:42) > > > at > > > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) > > > at > > > org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) > > > at > > > org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateOutputKeyComparator(ConfigUtils.java:125) > > > at > > > org.apache.tez.runtime.library.common.sort.impl.ExternalSorter.<init>(ExternalSorter.java:158) > > > at > > > org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.<init>(DefaultSorter.java:116) > > > at > > > org.apache.tez.runtime.library.output.OnFileSortedOutput.start(OnFileSortedOutput.java:109) > > > at > > > SimpleVertexProcessor.initializeInputOutputs(SimpleVertexProcessor.java:190) > > > > > > (2) Here it is not: > > > java.lang.Exception > > > at myPartitionerClassName.setConf(TezRecordComparator.java:42) > > > at > > > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) > > > at > > > org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) > > > at > > > org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135) > > > at > > > org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808) > > > at > > > org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465) > > > at > > > org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$RunShuffleCallable.call(Shuffle.java:344) > > > > > > > > > Found following workaround: > > > Configuration payloadConf = > > > TezUtils.createConfFromUserPayload(edgeProperty.getEdgeDestination().getUserPayload()); > > > payloadConf(“myCustomProperty”,”value”) > > > > > > edgeProperty.getEdgeDestination().setUserPayload(TezUtils.createUserPayloadFromConf(payloadConf)); > > > > > > I think it boils down to that the property is passed to the edge input > > > but not to its destination !? > > > However, is there some smarter way making that property available to all > > > instantiations of the comparator ? > > > I tried using > > > edgeConfBuilder.setAdditionalConfiguration(...) > > > edgeConfBuilder.configureOutput().setAdditionalConfiguration(…) > > > but that seems to filter out custom properties. > > > > > > Also do you plan to use a non-configuration based payload mechanism for > > > the edge stuff like you did for the input, output, processor ? > > > > > > Any enlightenment appreciated! > > > Johannes > > > > > > > > > > > > > > >
