Hey Sid,

started using OnFileUnorderedPartitionedKVOutput. Now i’m having the same 
problem with my configured keySerializationClass.
With OnFileSortedOutput i think this is covered by the comparatorConf map i’m 
passing the custom properties for comparator and serialiser with.

Any ideas ?
Btw would it be an idea to have input and output custom properties instead of 
partitionConf, comparatorConf and maybe serializerConf !?

Johannes


On 11 Aug 2014, at 11:55, Johannes Zillmann <[email protected]> wrote:

> Awesome, that works, Thanks Sid!
> 
> Johannes
> 
> On 06 Aug 2014, at 19:21, Siddharth Seth <[email protected]> wrote:
> 
>> Johannes,
>> You need to be using the confMap available on the setComparator API to make 
>> it visible for the comparator.
>> 
>> 
>> On Wed, Aug 6, 2014 at 5:54 AM, Johannes Zillmann <[email protected]> 
>> wrote:
>> Hey Sid,
>> 
>> that was fast. Unluckily that doesn’t solve the problem.
>> Passing in the custom property via partitionConfMap makes it available at 
>> the edgeInput, but not at the edgeOutput.
>> Job fails at:
>>        at myPartitionerClassName.setConf(TezRecordComparator.java:39)
>>        at 
>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
>>        at 
>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
>>        at 
>> org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135)
>>        at 
>> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808)
>>        at 
>> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465)
>>        at 
>> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.cleanupMerger(Shuffle.java:413)
>>        at 
>> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.cleanupIgnoreErrors(Shuffle.java:428)
>>        at 
>> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle.access$1900(Shuffle.java:75)
>>        at 
>> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$ShuffleRunnerFutureCallback.onFailure(Shuffle.java:474)
>>        at com.google.common.util.concurrent.Futures$6.run(Futures.java:977)
>> 
>> Johannes
>> 
>> 
>> On 06 Aug 2014, at 09:08, Siddharth Seth <[email protected]> wrote:
>> 
>>> TEZ-1379 went in. You should be able to use this properly now.
>>> 
>>> 
>>> On Tue, Aug 5, 2014 at 11:27 PM, Johannes Zillmann 
>>> <[email protected]> wrote:
>>> Hey Sid,
>>> On 05 Aug 2014, at 21:05, Siddharth Seth <[email protected]> wrote:
>>> 
>>>> The last configuration parameter to " 
>>>> OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, 
>>>> valueClassName, myPartitionerClassName, jobConfForShuffleSort);" is the 
>>>> configuration for the partitioner itself. That's only used in the Output - 
>>>> and hence is not available in the consuming Input.
>>>> 
>>>> It looks like we're missing the option to set a Configuration for the 
>>>> comparator. There's a couple of other changes required in the 
>>>> EdgeConfigurers - I'll create a jira and post a patch later today.
>>> Cool, thanks!
>>> 
>>>> 
>>>> One of the big reasons to separate out the Configurations is to limit the 
>>>> size of the payload generated. Using a generic conf (which usually ends up 
>>>> inheriting from JobConf etc) ends up setting a large number of keys (1000+ 
>>>> in cases), off which very few are actually used. setFromConfiguration(...) 
>>>> actually strips out unused keys. The partitionerConf parameter is meant to 
>>>> be a very specific Configuration only for the Partitioner (should only 
>>>> contain the limited set of keys required to run the partitioner). 
>>>> Similarly for the Comparator conf - once it is added. Tez has no way of 
>>>> knowing what a valid set of keys for the partitioner, comparator and 
>>>> combiner are - since these are all user specified classes.
>>> 
>>> ++++1 yeah, basically i like moving away from configuration!
>>> Just this time it hit me a bit ;)
>>> 
>>>> 
>>>> Till I can get a patch going for this, your usage model to get this 
>>>> working is likely the only one which will work.
>>> 
>>> Ok will do!
>>> Johannes
>>> 
>>>> 
>>>> 
>>>> On Tue, Aug 5, 2014 at 8:23 AM, Johannes Zillmann 
>>>> <[email protected]> wrote:
>>>> Hey guys,
>>>> 
>>>> i just upgraded my application to the most current master code of Tez.
>>>> Run into a problem with setting up my custom key comparator.
>>>> It implements org.apache.hadoop.conf.Configurable and expects a custom 
>>>> property in the passed in configuration.
>>>> 
>>>> So initially i tried:
>>>>        JobConf jobConfForShuffleSort = new JobConf();
>>>>        jobConfForShuffleSort.set(“myCustomProperty”,”value”)
>>>>        Builder edgeConfBuilder = 
>>>> OrderedPartitionedKVEdgeConfigurer.newBuilder(keyClassName, 
>>>> valueClassName, myPartitionerClassName, jobConfForShuffleSort);
>>>> 
>>>> But the property does not come through to the instance of 
>>>> ‘myPartitionerClassName’.
>>>> Basically i see the comparator instantiated 2 times:
>>>> 
>>>> (1) Here the custom property is available:
>>>> java.lang.Exception
>>>>        at myPartitionerClassName.setConf(TezRecordComparator.java:42)
>>>>        at 
>>>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
>>>>        at 
>>>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
>>>>        at 
>>>> org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateOutputKeyComparator(ConfigUtils.java:125)
>>>>        at 
>>>> org.apache.tez.runtime.library.common.sort.impl.ExternalSorter.<init>(ExternalSorter.java:158)
>>>>        at 
>>>> org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter.<init>(DefaultSorter.java:116)
>>>>        at 
>>>> org.apache.tez.runtime.library.output.OnFileSortedOutput.start(OnFileSortedOutput.java:109)
>>>>        at 
>>>> SimpleVertexProcessor.initializeInputOutputs(SimpleVertexProcessor.java:190)
>>>> 
>>>> (2) Here it is not:
>>>>  java.lang.Exception
>>>>        at myPartitionerClassName.setConf(TezRecordComparator.java:42)
>>>>        at 
>>>> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
>>>>        at 
>>>> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
>>>>        at 
>>>> org.apache.tez.runtime.library.common.ConfigUtils.getIntermediateInputKeyComparator(ConfigUtils.java:135)
>>>>        at 
>>>> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.finalMerge(MergeManager.java:808)
>>>>        at 
>>>> org.apache.tez.runtime.library.common.shuffle.impl.MergeManager.close(MergeManager.java:465)
>>>>        at 
>>>> org.apache.tez.runtime.library.common.shuffle.impl.Shuffle$RunShuffleCallable.call(Shuffle.java:344)
>>>> 
>>>> 
>>>> Found following workaround:
>>>>        Configuration payloadConf = 
>>>> TezUtils.createConfFromUserPayload(edgeProperty.getEdgeDestination().getUserPayload());
>>>>        payloadConf(“myCustomProperty”,”value”)
>>>>        
>>>> edgeProperty.getEdgeDestination().setUserPayload(TezUtils.createUserPayloadFromConf(payloadConf));
>>>> 
>>>> I think it boils down to that the property is passed to the edge input but 
>>>> not to its destination !?
>>>> However, is there some smarter way making that property available to all 
>>>> instantiations of the comparator ?
>>>> I tried using
>>>>        edgeConfBuilder.setAdditionalConfiguration(...)
>>>>        edgeConfBuilder.configureOutput().setAdditionalConfiguration(…)
>>>> but that seems to filter out custom properties.
>>>> 
>>>> Also do you plan to use a non-configuration based payload mechanism for 
>>>> the edge stuff like you did for the input, output, processor ?
>>>> 
>>>> Any enlightenment appreciated!
>>>> Johannes
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to