Fantastic Fabian, that was it :-)! I'm glad it wasn't a more severe/tricky
programming error though I already spent quite some time wondering about
this one.

Have a nice day!

- Pieter



2015-09-16 14:27 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Sorry, I was thinking too complicated. Forget about the methods I
> mentioned.
>
> If you are implementing WritableComparable types, you need to override the
> hashcode() method.
> Flink treats WritableComparable types just like Hadoop [1].
> DawnData does not implement hashcode() which causes inconsistent hash
> partitioning.
>
> Please let me know, if that solved your problem.
>
> Cheers, Fabian
>
> [1]
> https://squarecog.wordpress.com/2011/02/20/hadoop-requires-stable-hashcode-implementations/
>
> 2015-09-16 14:12 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
>
>> Hi,
>>
>> I havent been able to find the problem yet, and I dont know exactly how
>> to check the methods you suggested to check earlier (extractKeys,
>> getFlatComparators, duplicate) for the Scala API. Do you have some pointers
>> for me on how I can check these myself?
>>
>> In my earlier mail I stated that maps, filters and reduces work fine. I
>> found that this was not correct: for my previous queries I have only used
>> maps and filters. I made an extra test and found that indeed the following
>> code using a reduce also generates faulty results when increasing
>> paralellism past 1:
>>
>> def auctions : DataSet[DawnData] = env.readFile(new 
>> XML2DawnInputFormat(auctionInput), path)
>> def test = 
>> auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( 
>> (groupedauctions, out : Collector[DawnData]) => {
>>   out.collect(new DawnData(groupedauctions.size))
>> }).setParallelism(1)
>> test.print
>>
>> Does this indicate that something else could be wrong with the custom
>> datatype?
>>
>> You can find the corresponding code and a small dataset at
>> https://github.com/PHameete/dawn-flink in the *development* branch. It
>> is a Scala Maven project so you should be able to run the
>> *main.scala.wis.dawnflink.performance.DawnBenchmarkSuite* class out of
>> the box to run the query from my first email. In this class you can also
>> change the query thats being run or run multiple queries. If this does not
>> work please let me know!
>>
>> Kind regards and cheers again!
>>
>> - Pieter
>>
>>
>>
>>
>> 2015-09-16 11:24 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
>>
>>> Cheers Till and Fabian for your fast replies, it's much appreciated!
>>>
>>> I figured something should be wrong with my data type. I have no doubt
>>> the CoGroup works just fine :-) Its pointers what to investigate about my
>>> datatype what I am looking for. Initially I had problems with serialization
>>> causing strange issues as well, these were resolved after I had rewritten
>>> my serialization so I believe that is working OK.
>>>
>>> I'll try looking into the data type some more with your tips. If I cant
>>> figure it out i'll share the repository with you later today or tomorrow.
>>>
>>> Kind regards,
>>>
>>> Pieter
>>>
>>>
>>>
>>>
>>>
>>> 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>
>>>> This sound like a problem with your custom type and its (presumably)
>>>> custom serializers and comparators.
>>>>
>>>> I assume it is not an issue of partitioning or sorting because Reduce
>>>> is working fine, as you reported.
>>>> CoGroup does also partition and sort data, but compares the elements of
>>>> two sorted streams.
>>>>
>>>> I would check the following methods:
>>>> - extractKeys
>>>> - getFlatComparators
>>>> - duplicate (duplicate must return a deep copy, esp. of all nested
>>>> comparators)
>>>>
>>>> Feel free to share your custom TypeInfo, Comparator, and Serializers.
>>>>
>>>> Cheers, Fabian
>>>>
>>>> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>:
>>>>
>>>>> Hi Pieter,
>>>>>
>>>>> your code doesn't look suspicious at the first glance. Would it be
>>>>> possible for you to post a complete example with data (also possible to
>>>>> include it in the code) to reproduce your problem?
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Dear fellow Flinkers,
>>>>>>
>>>>>> I am implementing queries from the XMark (
>>>>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a
>>>>>> custom nested data type. Reading the XML data generated by the XMark
>>>>>> generator into my custom nested datatype works perfectly, and the queries
>>>>>> that I have implemented so far using mostly map, reduce and filter 
>>>>>> produce
>>>>>> correct results.
>>>>>>
>>>>>> For the next query I wish to cogroup a dataset containing person data
>>>>>> with a dataset containing auction data, joined by the *personid *of
>>>>>> the person and the *personid *of the buyer of an auction, so that I
>>>>>> can count the number of purchases of a person. I select this *personid
>>>>>> *as key from the custom nested data type in the *where* and *equalTo
>>>>>> *functions of the *coGroup*. The XML2DawnInputFormat is my custom
>>>>>> input format that reads XML into my custom nested datatype *DawnData*.
>>>>>> The 'inputGraph' and 'auctionInput' are a projection on the XML input to
>>>>>> prevent reading unnecessary data.
>>>>>>
>>>>>> def env = ExecutionEnvironment.*getExecutionEnvironment
>>>>>> *def persons : DataSet[DawnData] = env.readFile(new 
>>>>>> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = 
>>>>>> env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = 
>>>>>> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) 
>>>>>> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( 
>>>>>> (personsres, auctionsres, out : Collector[DawnData]) => {   // my 
>>>>>> cogroup function here that outputs the name of the person and the number 
>>>>>> of auctions  }}).setParallelism(1)
>>>>>>
>>>>>> This code works fine with parallelism set to 1 as above. My issue is
>>>>>> that if I raise the parallelism of the coGroup above 1 the data will get
>>>>>> mixed up. Often the auctions Iterator will be empty, and sometimes there
>>>>>> are non-empty auction iterators passed to the cogroup function where the
>>>>>> persons iterator is empty, but this is impossible because all buyers 
>>>>>> exist
>>>>>> in the persons database!
>>>>>>
>>>>>> If anyone has some pointers for me why this code starts producing
>>>>>> strange results when parallelism is set above 1 this would be greatly
>>>>>> appreciated :-)
>>>>>>
>>>>>> Kind regards.
>>>>>>
>>>>>> Pieter Hameete
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to