Re: TeraSort on Flink and Spark
transformations, (Text, Text) to (OptimizedText, Text) and (OptimizedText, Text) to (Text, Text), can prevent Flink from performing a lot better. I don't have time to modify TeraInputFormat and TeraOutputFormat to read (String, String) pairs from HDFS and write (String, String) pairs to HDFS. Do you see that one can get a better TeraSort result using an new implementation of FileInputFormatString,String? Regards, Dongwon Kim 2015-07-03 3:29 GMT+09:00 Stephan Ewen se...@apache.org: Hello Dongwon Kim! Thanks you for sharing these numbers with us. I have gone through your implementation and there are two things you could try: 1) I see that you sort Hadoop's Text data type with Flink. I think this may be less efficient than if you sort String, or a Flink specific data type. For efficient byte operations on managed memory, Flink needs to understand the binary representation of the data type. Flink understands that for String and many other types, but not for Text. There are two things you can do: - First, try what happens if you map the Hadoop Text type to a Java String (only for the tera key). - Second, you can try what happens if you wrap the Hadoop Text type in a Flink type that supports optimized binary sorting. I have pasted code for that at the bottom of this email. 2) You can see if it helps performance if you enable object re-use in Flink. You can do this on the ExecutionEnvironment via env.getConfig().enableObjectReuse(). Then Flink tries to use the same objects repeatedly, in case they are mutable. Can you try these options out and see how they affect Flink's runtime? Greetings, Stephan - Code for optimized sortable (Java): public final class OptimizedText implements NormalizableKeyOptimizedText { private final Text text; public OptimizedText () { this.text = new Text(); } public OptimizedText (Text from) { this.text = from; } public Text getText() { return text; } @Override public int getMaxNormalizedKeyLen() { return 10; } @Override public void copyNormalizedKey(MemorySegment memory, int offset, int len) { memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(), Math.min(10, len))); } @Override public void write(DataOutputView out) throws IOException { text.write(out); } @Override public void read(DataInputView in) throws IOException { text.readFields(in); } @Override public int compareTo(OptimizedText o) { return this.text.compareTo(o.text); } } - Converting Text to OptimizedText (Java code) map(new MapFunctionTuple2Text, Text, Tuple2OptimizedText, Text() { @Override public Tuple2OptimizedText, Text map(Tuple2Text, Text value) { return new Tuple2OptimizedText, Text(new OptimizedText(value.f0), value.f1); } }) On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim eastcirc...@postech.ac.kr wrote: Hello, I'd like to share my code for TeraSort on Flink and Spark which uses the same range partitioner as Hadoop TeraSort: https://github.com/eastcirclek/terasort I also write a short report on it: http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html In the blog post, I make a simple performance comparison between Flink, Spark, Tez, and MapReduce. I hope it will be helpful to you guys! Thanks. Dongwon Kim Postdoctoral Researcher @ Postech
Re: TeraSort on Flink and Spark
. Can you try these options out and see how they affect Flink's runtime? Greetings, Stephan - Code for optimized sortable (Java): public final class OptimizedText implements NormalizableKeyOptimizedText { private final Text text; public OptimizedText () { this.text = new Text(); } public OptimizedText (Text from) { this.text = from; } public Text getText() { return text; } @Override public int getMaxNormalizedKeyLen() { return 10; } @Override public void copyNormalizedKey(MemorySegment memory, int offset, int len) { memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(), Math.min(10, len))); } @Override public void write(DataOutputView out) throws IOException { text.write(out); } @Override public void read(DataInputView in) throws IOException { text.readFields(in); } @Override public int compareTo(OptimizedText o) { return this.text.compareTo(o.text); } } - Converting Text to OptimizedText (Java code) map(new MapFunctionTuple2Text, Text, Tuple2OptimizedText, Text() { @Override public Tuple2OptimizedText, Text map(Tuple2Text, Text value) { return new Tuple2OptimizedText, Text(new OptimizedText(value.f0), value.f1); } }) On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim eastcirc...@postech.ac.kr wrote: Hello, I'd like to share my code for TeraSort on Flink and Spark which uses the same range partitioner as Hadoop TeraSort: https://github.com/eastcirclek/terasort I also write a short report on it: http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html In the blog post, I make a simple performance comparison between Flink, Spark, Tez, and MapReduce. I hope it will be helpful to you guys! Thanks. Dongwon Kim Postdoctoral Researcher @ Postech
Re: TeraSort on Flink and Spark
IOException { text.readFields(in); } @Override public int compareTo(OptimizedText o) { return this.text.compareTo(o.text); } } - Converting Text to OptimizedText (Java code) map(new MapFunctionTuple2Text, Text, Tuple2OptimizedText, Text() { @Override public Tuple2OptimizedText, Text map(Tuple2Text, Text value) { return new Tuple2OptimizedText, Text(new OptimizedText(value.f0), value.f1); } }) On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim eastcirc...@postech.ac.kr wrote: Hello, I'd like to share my code for TeraSort on Flink and Spark which uses the same range partitioner as Hadoop TeraSort: https://github.com/eastcirclek/terasort I also write a short report on it: http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html In the blog post, I make a simple performance comparison between Flink, Spark, Tez, and MapReduce. I hope it will be helpful to you guys! Thanks. Dongwon Kim Postdoctoral Researcher @ Postech
Re: TeraSort on Flink and Spark
memory, int offset, int len) { memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(), Math.min(10, len))); } @Override public void write(DataOutputView out) throws IOException { text.write(out); } @Override public void read(DataInputView in) throws IOException { text.readFields(in); } @Override public int compareTo(OptimizedText o) { return this.text.compareTo(o.text); } } - Converting Text to OptimizedText (Java code) map(new MapFunctionTuple2Text, Text, Tuple2OptimizedText, Text() { @Override public Tuple2OptimizedText, Text map(Tuple2Text, Text value) { return new Tuple2OptimizedText, Text(new OptimizedText(value.f0), value.f1); } }) On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim eastcirc...@postech.ac.kr wrote: Hello, I'd like to share my code for TeraSort on Flink and Spark which uses the same range partitioner as Hadoop TeraSort: https://github.com/eastcirclek/terasort I also write a short report on it: http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html In the blog post, I make a simple performance comparison between Flink, Spark, Tez, and MapReduce. I hope it will be helpful to you guys! Thanks. Dongwon Kim Postdoctoral Researcher @ Postech
TeraSort on Flink and Spark
Hello, I'd like to share my code for TeraSort on Flink and Spark which uses the same range partitioner as Hadoop TeraSort: https://github.com/eastcirclek/terasort I also write a short report on it: http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html In the blog post, I make a simple performance comparison between Flink, Spark, Tez, and MapReduce. I hope it will be helpful to you guys! Thanks. Dongwon Kim Postdoctoral Researcher @ Postech