Re: TeraSort on Flink and Spark

2015-07-12 Thread Dongwon Kim
 transformations,
 (Text, Text) to (OptimizedText, Text) and  (OptimizedText, Text) to
 (Text, Text),
 can prevent Flink from performing a lot better.
 I don't have time to modify TeraInputFormat and TeraOutputFormat to
 read (String, String) pairs from HDFS and write (String, String) pairs
 to HDFS.
 Do you see that one can get a better TeraSort result using an new
 implementation of FileInputFormatString,String?

 Regards,

 Dongwon Kim

 2015-07-03 3:29 GMT+09:00 Stephan Ewen se...@apache.org:
  Hello Dongwon Kim!
 
  Thanks you for sharing these numbers with us.
 
  I have gone through your implementation and there are two things you
 could
  try:
 
  1)
 
  I see that you sort Hadoop's Text data type with Flink. I think this
 may be
  less efficient than if you sort String, or a Flink specific data type.
 
  For efficient byte operations on managed memory, Flink needs to
 understand
  the binary representation of the data type. Flink understands that for
  String and many other types, but not for Text.
 
  There are two things you can do:
- First, try what happens if you map the Hadoop Text type to a Java
 String
  (only for the tera key).
- Second, you can try what happens if you wrap the Hadoop Text type
 in a
  Flink type that supports optimized binary sorting. I have pasted code
 for
  that at the bottom of this email.
 
  2)
 
  You can see if it helps performance if you enable object re-use in
 Flink.
  You can do this on the ExecutionEnvironment via
  env.getConfig().enableObjectReuse(). Then Flink tries to use the same
  objects repeatedly, in case they are mutable.
 
 
  Can you try these options out and see how they affect Flink's runtime?
 
 
  Greetings,
  Stephan
 
  -
  Code for optimized sortable (Java):
 
  public final class OptimizedText implements
 NormalizableKeyOptimizedText 
  {
  private final Text text;
  public OptimizedText () {
  this.text = new Text();
  }
  public OptimizedText (Text from) {
  this.text = from;
  }
 
  public Text getText() {
  return text;
  }
 
  @Override
  public int getMaxNormalizedKeyLen() {
  return 10;
  }
 
  @Override
  public void copyNormalizedKey(MemorySegment memory, int offset, int
 len) {
  memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
  Math.min(10, len)));
  }
 
  @Override
  public void write(DataOutputView out) throws IOException {
  text.write(out);
  }
 
  @Override
  public void read(DataInputView in) throws IOException {
  text.readFields(in);
  }
 
  @Override
  public int compareTo(OptimizedText o) {
  return this.text.compareTo(o.text);
  }
  }
 
  -
  Converting Text to OptimizedText (Java code)
 
  map(new MapFunctionTuple2Text, Text, Tuple2OptimizedText, Text()
 {
  @Override
  public Tuple2OptimizedText, Text map(Tuple2Text, Text value) {
  return new Tuple2OptimizedText, Text(new OptimizedText(value.f0),
  value.f1);
  }
  })
 
 
 
 
  On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim eastcirc...@postech.ac.kr
 
  wrote:
 
  Hello,
 
  I'd like to share my code for TeraSort on Flink and Spark which uses
  the same range partitioner as Hadoop TeraSort:
  https://github.com/eastcirclek/terasort
 
  I also write a short report on it:
 
 
 http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
  In the blog post, I make a simple performance comparison between
  Flink, Spark, Tez, and MapReduce.
 
  I hope it will be helpful to you guys!
  Thanks.
 
  Dongwon Kim
  Postdoctoral Researcher @ Postech
 
 






Re: TeraSort on Flink and Spark

2015-07-12 Thread Hawin Jiang
.
 
 
  Can you try these options out and see how they affect Flink's runtime?
 
 
  Greetings,
  Stephan
 
  -
  Code for optimized sortable (Java):
 
  public final class OptimizedText implements
 NormalizableKeyOptimizedText 
  {
  private final Text text;
  public OptimizedText () {
  this.text = new Text();
  }
  public OptimizedText (Text from) {
  this.text = from;
  }
 
  public Text getText() {
  return text;
  }
 
  @Override
  public int getMaxNormalizedKeyLen() {
  return 10;
  }
 
  @Override
  public void copyNormalizedKey(MemorySegment memory, int offset, int
 len) {
  memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
  Math.min(10, len)));
  }
 
  @Override
  public void write(DataOutputView out) throws IOException {
  text.write(out);
  }
 
  @Override
  public void read(DataInputView in) throws IOException {
  text.readFields(in);
  }
 
  @Override
  public int compareTo(OptimizedText o) {
  return this.text.compareTo(o.text);
  }
  }
 
  -
  Converting Text to OptimizedText (Java code)
 
  map(new MapFunctionTuple2Text, Text, Tuple2OptimizedText, Text() {
  @Override
  public Tuple2OptimizedText, Text map(Tuple2Text, Text value) {
  return new Tuple2OptimizedText, Text(new OptimizedText(value.f0),
  value.f1);
  }
  })
 
 
 
 
  On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim eastcirc...@postech.ac.kr
  wrote:
 
  Hello,
 
  I'd like to share my code for TeraSort on Flink and Spark which uses
  the same range partitioner as Hadoop TeraSort:
  https://github.com/eastcirclek/terasort
 
  I also write a short report on it:
 
 
 http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
  In the blog post, I make a simple performance comparison between
  Flink, Spark, Tez, and MapReduce.
 
  I hope it will be helpful to you guys!
  Thanks.
 
  Dongwon Kim
  Postdoctoral Researcher @ Postech
 
 





Re: TeraSort on Flink and Spark

2015-07-10 Thread Stephan Ewen
 IOException {
  text.readFields(in);
  }
 
  @Override
  public int compareTo(OptimizedText o) {
  return this.text.compareTo(o.text);
  }
  }
 
  -
  Converting Text to OptimizedText (Java code)
 
  map(new MapFunctionTuple2Text, Text, Tuple2OptimizedText, Text() {
  @Override
  public Tuple2OptimizedText, Text map(Tuple2Text, Text value) {
  return new Tuple2OptimizedText, Text(new OptimizedText(value.f0),
  value.f1);
  }
  })
 
 
 
 
  On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim eastcirc...@postech.ac.kr
  wrote:
 
  Hello,
 
  I'd like to share my code for TeraSort on Flink and Spark which uses
  the same range partitioner as Hadoop TeraSort:
  https://github.com/eastcirclek/terasort
 
  I also write a short report on it:
 
 
 http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
  In the blog post, I make a simple performance comparison between
  Flink, Spark, Tez, and MapReduce.
 
  I hope it will be helpful to you guys!
  Thanks.
 
  Dongwon Kim
  Postdoctoral Researcher @ Postech
 
 



Re: TeraSort on Flink and Spark

2015-07-10 Thread Fabian Hueske
 memory, int offset, int len)
 {
  memory.put(offset, text.getBytes(), 0, Math.min(text.getLength(),
  Math.min(10, len)));
  }
 
  @Override
  public void write(DataOutputView out) throws IOException {
  text.write(out);
  }
 
  @Override
  public void read(DataInputView in) throws IOException {
  text.readFields(in);
  }
 
  @Override
  public int compareTo(OptimizedText o) {
  return this.text.compareTo(o.text);
  }
  }
 
  -
  Converting Text to OptimizedText (Java code)
 
  map(new MapFunctionTuple2Text, Text, Tuple2OptimizedText, Text() {
  @Override
  public Tuple2OptimizedText, Text map(Tuple2Text, Text value) {
  return new Tuple2OptimizedText, Text(new OptimizedText(value.f0),
  value.f1);
  }
  })
 
 
 
 
  On Thu, Jul 2, 2015 at 6:47 PM, Dongwon Kim eastcirc...@postech.ac.kr
  wrote:
 
  Hello,
 
  I'd like to share my code for TeraSort on Flink and Spark which uses
  the same range partitioner as Hadoop TeraSort:
  https://github.com/eastcirclek/terasort
 
  I also write a short report on it:
 
 
 http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
  In the blog post, I make a simple performance comparison between
  Flink, Spark, Tez, and MapReduce.
 
  I hope it will be helpful to you guys!
  Thanks.
 
  Dongwon Kim
  Postdoctoral Researcher @ Postech
 
 



TeraSort on Flink and Spark

2015-07-02 Thread Dongwon Kim
Hello,

I'd like to share my code for TeraSort on Flink and Spark which uses
the same range partitioner as Hadoop TeraSort:
https://github.com/eastcirclek/terasort

I also write a short report on it:
http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html
In the blog post, I make a simple performance comparison between
Flink, Spark, Tez, and MapReduce.

I hope it will be helpful to you guys!
Thanks.

Dongwon Kim
Postdoctoral Researcher @ Postech