Assume I have a large book with many Chapters and many lines of text. Assume I have a function that tells me the similarity of two lines of text. The objective is to find the most similar line in the same chapter within 200 lines of the line found. The real problem involves biology and is beyond this discussion.
In the code shown below I convert Lines with location into a Tuple2 where location is the key, Now I want to partition by chapter (I think maybe that is right) Now for every chapter I want to look at lines in order of location I want to keep the last 200 locations (as LineAndLocationMatch ) search them to update the best fit and for every line add a best fit. When a line is over 200 away from the current line it can be added ti the return JavaRDD. I know how to to the map and generate doubles but not how to do the sort and reduce or even what the reduce function arguments look like. Please use Java functions - not Lambdas as a sample- I am a strong typing guy - returning JavaRDDs show me the type for a series of . operations and really helps me understand what is happening I expect my reduceFunction to look like void reduceFunction(KeyClass key,Iterator<LineAndLocation> values) but to have some way to accept the best fit LineAndLocationMatch generated as values are iterated. There is no reason to think that the number of objects will fit in memory. Also it is important for the function doing the reduce to know the key. I am very lost at what the reduce look like. Under the covers reduce involves a lot of Java code which knows very little about spark and Hadoop. My pseudo code looke like this - as far as I have working // one line in the book static class LineAndLocation { int chapter; int lineNumber; String line; } // one line in the book static class LineAndLocationMatch { LineAndLocationMatch thisLine; LineAndLocationMatch bestFit; } // location - acts as a key static class KeyClass { int chapter; int lineNumber; KeyClass(final int pChapter, final int pLineNumber) { chapter = pChapter; lineNumber = pLineNumber; } } // used to compute the best fit public class SimilarityFunction { double getSimilarity(String s1,String s2) { return 0; // todo do work here } } // This functions returns a RDD with best macth objects public static JavaRDD<LineAndLocationMatch> findBestMatchesLikeHadoop(JavaRDD<LineAndLocation> inputs) { // So this is what the mapper does - make key value pairs JavaPairRDD<KeyClass , LineAndLocation > mappedKeys = inputs.mapToPair(new PairFunction<LineAndLocation, KeyClass, LineAndLocation>() { @Override public Tuple2<KeyClass , LineAndLocation > call(final LineAndLocation v) throws Exception { return new Tuple2(new KeyClass(v.chapter,v.lineNumber),v); } }); // Partition by chapters ?? is this right?? mappedKeys = mappedKeys.partitionBy(new Partitioner() { @Override public int numPartitions() { return 20; } @Override public int getPartition(final Object key) { return ((KeyClass)key).chapter % numPartitions(); } }); // Now I get very fuzzy - I for every partition I want sort on line number JavaPairRDD<KeyClass , LineAndLocation > sortedKeys = ??? WHAT HAPPENS HERE // Now I need to to a reduce operation What I want is JavaRDD<LineAndLocationMatch> bestMatches = sortedKeys.<SOME FUNCTION>(); return bestMatches; }