Fantastic Fabian, that was it :-)! I'm glad it wasn't a more severe/tricky programming error though I already spent quite some time wondering about this one.
Have a nice day! - Pieter 2015-09-16 14:27 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Sorry, I was thinking too complicated. Forget about the methods I > mentioned. > > If you are implementing WritableComparable types, you need to override the > hashcode() method. > Flink treats WritableComparable types just like Hadoop [1]. > DawnData does not implement hashcode() which causes inconsistent hash > partitioning. > > Please let me know, if that solved your problem. > > Cheers, Fabian > > [1] > https://squarecog.wordpress.com/2011/02/20/hadoop-requires-stable-hashcode-implementations/ > > 2015-09-16 14:12 GMT+02:00 Pieter Hameete <phame...@gmail.com>: > >> Hi, >> >> I havent been able to find the problem yet, and I dont know exactly how >> to check the methods you suggested to check earlier (extractKeys, >> getFlatComparators, duplicate) for the Scala API. Do you have some pointers >> for me on how I can check these myself? >> >> In my earlier mail I stated that maps, filters and reduces work fine. I >> found that this was not correct: for my previous queries I have only used >> maps and filters. I made an extra test and found that indeed the following >> code using a reduce also generates faulty results when increasing >> paralellism past 1: >> >> def auctions : DataSet[DawnData] = env.readFile(new >> XML2DawnInputFormat(auctionInput), path) >> def test = >> auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( >> (groupedauctions, out : Collector[DawnData]) => { >> out.collect(new DawnData(groupedauctions.size)) >> }).setParallelism(1) >> test.print >> >> Does this indicate that something else could be wrong with the custom >> datatype? >> >> You can find the corresponding code and a small dataset at >> https://github.com/PHameete/dawn-flink in the *development* branch. It >> is a Scala Maven project so you should be able to run the >> *main.scala.wis.dawnflink.performance.DawnBenchmarkSuite* class out of >> the box to run the query from my first email. In this class you can also >> change the query thats being run or run multiple queries. If this does not >> work please let me know! >> >> Kind regards and cheers again! >> >> - Pieter >> >> >> >> >> 2015-09-16 11:24 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >> >>> Cheers Till and Fabian for your fast replies, it's much appreciated! >>> >>> I figured something should be wrong with my data type. I have no doubt >>> the CoGroup works just fine :-) Its pointers what to investigate about my >>> datatype what I am looking for. Initially I had problems with serialization >>> causing strange issues as well, these were resolved after I had rewritten >>> my serialization so I believe that is working OK. >>> >>> I'll try looking into the data type some more with your tips. If I cant >>> figure it out i'll share the repository with you later today or tomorrow. >>> >>> Kind regards, >>> >>> Pieter >>> >>> >>> >>> >>> >>> 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>> >>>> This sound like a problem with your custom type and its (presumably) >>>> custom serializers and comparators. >>>> >>>> I assume it is not an issue of partitioning or sorting because Reduce >>>> is working fine, as you reported. >>>> CoGroup does also partition and sort data, but compares the elements of >>>> two sorted streams. >>>> >>>> I would check the following methods: >>>> - extractKeys >>>> - getFlatComparators >>>> - duplicate (duplicate must return a deep copy, esp. of all nested >>>> comparators) >>>> >>>> Feel free to share your custom TypeInfo, Comparator, and Serializers. >>>> >>>> Cheers, Fabian >>>> >>>> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: >>>> >>>>> Hi Pieter, >>>>> >>>>> your code doesn't look suspicious at the first glance. Would it be >>>>> possible for you to post a complete example with data (also possible to >>>>> include it in the code) to reproduce your problem? >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com> >>>>> wrote: >>>>> >>>>>> Dear fellow Flinkers, >>>>>> >>>>>> I am implementing queries from the XMark ( >>>>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a >>>>>> custom nested data type. Reading the XML data generated by the XMark >>>>>> generator into my custom nested datatype works perfectly, and the queries >>>>>> that I have implemented so far using mostly map, reduce and filter >>>>>> produce >>>>>> correct results. >>>>>> >>>>>> For the next query I wish to cogroup a dataset containing person data >>>>>> with a dataset containing auction data, joined by the *personid *of >>>>>> the person and the *personid *of the buyer of an auction, so that I >>>>>> can count the number of purchases of a person. I select this *personid >>>>>> *as key from the custom nested data type in the *where* and *equalTo >>>>>> *functions of the *coGroup*. The XML2DawnInputFormat is my custom >>>>>> input format that reads XML into my custom nested datatype *DawnData*. >>>>>> The 'inputGraph' and 'auctionInput' are a projection on the XML input to >>>>>> prevent reading unnecessary data. >>>>>> >>>>>> def env = ExecutionEnvironment.*getExecutionEnvironment >>>>>> *def persons : DataSet[DawnData] = env.readFile(new >>>>>> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = >>>>>> env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = >>>>>> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) >>>>>> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( >>>>>> (personsres, auctionsres, out : Collector[DawnData]) => { // my >>>>>> cogroup function here that outputs the name of the person and the number >>>>>> of auctions }}).setParallelism(1) >>>>>> >>>>>> This code works fine with parallelism set to 1 as above. My issue is >>>>>> that if I raise the parallelism of the coGroup above 1 the data will get >>>>>> mixed up. Often the auctions Iterator will be empty, and sometimes there >>>>>> are non-empty auction iterators passed to the cogroup function where the >>>>>> persons iterator is empty, but this is impossible because all buyers >>>>>> exist >>>>>> in the persons database! >>>>>> >>>>>> If anyone has some pointers for me why this code starts producing >>>>>> strange results when parallelism is set above 1 this would be greatly >>>>>> appreciated :-) >>>>>> >>>>>> Kind regards. >>>>>> >>>>>> Pieter Hameete >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >