Re: Optimisation advice for Avro-Parquet merge job

2015-06-12 Thread James Aley
Hey Kiran,

Thanks very much for the response. I left for vacation before I could try
this out, but I'll experiment once I get back and let you know how it goes.

Thanks!

James.

On 8 June 2015 at 12:34, kiran lonikar loni...@gmail.com wrote:

 It turns out my assumption on load and unionAll being blocking is not
 correct. They are transformations. So instead of just running only the load
 and unionAll in the run() methods, I think you will have to save the
 intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
 like http://tachyon-project.org/) in the run() methods. The second for
 loop will also have to load from the intermediate parquet files. Then
 finally save the final dfInput[0] to the HDFS.

 I think this way of parallelizing will force the cluster to utilize the
 all the resources.

 -Kiran

 On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar loni...@gmail.com wrote:

 James,

 As I can see, there are three distinct parts to your program:

- for loop
- synchronized block
- final outputFrame.save statement

 Can you do a separate timing measurement by putting a simple
 System.currentTimeMillis() around these blocks to know how much they are
 taking and then try to optimize where it takes longest? In the second
 block, you may want to measure the time for the two statements. Improving
 this boils down to playing with spark settings.

 Now consider the first block: I think this is a classic case of merge
 sort or a reduce tree. You already tried to improve this by submitting jobs
 in parallel using executor pool/Callable etc.

 To further improve the parallelization, I suggest you use a reduce tree
 like approach. For example, lets say you want to compute sum of all
 elements of an array in parallel. The way its solved for a GPU like
 platform is you divide your input array initially in chunks of 2, compute
 those n/2 sums parallely on separate threads and save the result in the
 first of the two elements. In the next iteration, you compute n/4 sums
 parallely of the earlier sums and so on till you are left with only two
 elements whose sum gives you final sum.

 You are performing many sequential unionAll operations for inputs.size()
 avro files. Assuming the unionAll() on DataFrame is blocking (and not a
 simple transformation like on RDDs) and actually performs the union
 operation, you will certainly benefit by parallelizing this loop. You may
 change the loop to something like below:

 // pseudo code only
 int n = inputs.size()
 // initialize executor
 executor = new FixedThreadPoolExecutor(n/2)
 dfInput = new DataFrame[n/2]
 for(int i =0;i  n/2;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace with
 dfInput(i) in your code
 dfInput[i] = sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i +
 n/2), com.databricks.spark.avro))
 }
 });
 }

 executor.awaitTermination(0, TimeUnit.SECONDS)

 int steps = log(n)/log(2.0)
 for(s = 2; s  steps;s++) {
 int stride = n/(1  s); // n/(2^s)
 for(int i = 0;i  stride;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace
 with dfInput(i) and dfInput(i+stride) in your code
 dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
 }
 });
 }
 executor.awaitTermination(0, TimeUnit.SECONDS)
 }

 Let me know if it helped.

 -Kiran


 On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com
 wrote:

 Thanks for the confirmation! We're quite new to Spark, so a little
 reassurance is a good thing to have sometimes :-)

 The thing that's concerning me at the moment is that my job doesn't seem
 to run any faster with more compute resources added to the cluster, and
 this is proving a little tricky to debug. There are a lot of variables, so
 here's what we've tried already and the apparent impact. If anyone has any
 further suggestions, we'd love to hear!

 * Increase the minimum number of output files (targetPartitions
 above), so that input groups smaller than our minimum chunk size can still
 be worked on by more than one executor. This does measurably speed things
 up, but obviously it's a trade-off, as the original goal for this job is to
 merge our data into fewer, larger files.

 * Submit many jobs in parallel, by running the above code in a Callable,
 on an executor pool. This seems to help, to some extent, but I'm not sure
 what else needs to be configured alongside it -- driver threads, scheduling
 policy, etc. We set scheduling to FAIR when doing this, as that seemed
 like the right approach, but we're not 100% confident. It seemed to help
 quite substantially anyway, so perhaps this just needs further tuning?

 * Increasing executors, 

Re: Optimisation advice for Avro-Parquet merge job

2015-06-08 Thread kiran lonikar
James,

As I can see, there are three distinct parts to your program:

   - for loop
   - synchronized block
   - final outputFrame.save statement

Can you do a separate timing measurement by putting a simple
System.currentTimeMillis() around these blocks to know how much they are
taking and then try to optimize where it takes longest? In the second
block, you may want to measure the time for the two statements. Improving
this boils down to playing with spark settings.

Now consider the first block: I think this is a classic case of merge sort
or a reduce tree. You already tried to improve this by submitting jobs in
parallel using executor pool/Callable etc.

To further improve the parallelization, I suggest you use a reduce tree
like approach. For example, lets say you want to compute sum of all
elements of an array in parallel. The way its solved for a GPU like
platform is you divide your input array initially in chunks of 2, compute
those n/2 sums parallely on separate threads and save the result in the
first of the two elements. In the next iteration, you compute n/4 sums
parallely of the earlier sums and so on till you are left with only two
elements whose sum gives you final sum.

You are performing many sequential unionAll operations for inputs.size()
avro files. Assuming the unionAll() on DataFrame is blocking (and not a
simple transformation like on RDDs) and actually performs the union
operation, you will certainly benefit by parallelizing this loop. You may
change the loop to something like below:

// pseudo code only
int n = inputs.size()
// initialize executor
executor = new FixedThreadPoolExecutor(n/2)
dfInput = new DataFrame[n/2]
for(int i =0;i  n/2;i++) {
executor.submit(new Runnable() {
public void run() {
// union of i and i+n/2
// showing [] only to bring out array access. Replace with
dfInput(i) in your code
dfInput[i] = sqlContext.load(inputPaths.get(i),
com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i +
n/2), com.databricks.spark.avro))
}
});
}

executor.awaitTermination(0, TimeUnit.SECONDS)

int steps = log(n)/log(2.0)
for(s = 2; s  steps;s++) {
int stride = n/(1  s); // n/(2^s)
for(int i = 0;i  stride;i++) {
executor.submit(new Runnable() {
public void run() {
// union of i and i+n/2
// showing [] only to bring out array access. Replace with
dfInput(i) and dfInput(i+stride) in your code
dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
}
});
}
executor.awaitTermination(0, TimeUnit.SECONDS)
}

Let me know if it helped.

-Kiran


On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com wrote:

 Thanks for the confirmation! We're quite new to Spark, so a little
 reassurance is a good thing to have sometimes :-)

 The thing that's concerning me at the moment is that my job doesn't seem
 to run any faster with more compute resources added to the cluster, and
 this is proving a little tricky to debug. There are a lot of variables, so
 here's what we've tried already and the apparent impact. If anyone has any
 further suggestions, we'd love to hear!

 * Increase the minimum number of output files (targetPartitions above),
 so that input groups smaller than our minimum chunk size can still be
 worked on by more than one executor. This does measurably speed things up,
 but obviously it's a trade-off, as the original goal for this job is to
 merge our data into fewer, larger files.

 * Submit many jobs in parallel, by running the above code in a Callable,
 on an executor pool. This seems to help, to some extent, but I'm not sure
 what else needs to be configured alongside it -- driver threads, scheduling
 policy, etc. We set scheduling to FAIR when doing this, as that seemed
 like the right approach, but we're not 100% confident. It seemed to help
 quite substantially anyway, so perhaps this just needs further tuning?

 * Increasing executors, RAM, etc. This doesn't make a difference by itself
 for this job, so I'm thinking we're already not fully utilising the
 resources we have in a smaller cluster.

 Again, any recommendations appreciated. Thanks for the help!


 James.

 On 4 June 2015 at 15:00, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi

 2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com:

 Hi,

 We have a load of Avro data coming into our data systems in the form of
 relatively small files, which we're merging into larger Parquet files with
 Spark. I've been following the docs and the approach I'm taking seemed
 fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
 not the most optimal approach.

 I was wondering if anyone on this list might have some advice to make to
 make this job as efficient as possible. Here's some code:

 DataFrame dfInput = sqlContext.load(inputPaths.get(0),
 com.databricks.spark.avro);
 long totalSize = 

Re: Optimisation advice for Avro-Parquet merge job

2015-06-08 Thread kiran lonikar
It turns out my assumption on load and unionAll being blocking is not
correct. They are transformations. So instead of just running only the load
and unionAll in the run() methods, I think you will have to save the
intermediate dfInput[i] to temp (parquet) files (possibly to in memory DFS
like http://tachyon-project.org/) in the run() methods. The second for loop
will also have to load from the intermediate parquet files. Then finally
save the final dfInput[0] to the HDFS.

I think this way of parallelizing will force the cluster to utilize the all
the resources.

-Kiran

On Mon, Jun 8, 2015 at 12:30 PM, kiran lonikar loni...@gmail.com wrote:

 James,

 As I can see, there are three distinct parts to your program:

- for loop
- synchronized block
- final outputFrame.save statement

 Can you do a separate timing measurement by putting a simple
 System.currentTimeMillis() around these blocks to know how much they are
 taking and then try to optimize where it takes longest? In the second
 block, you may want to measure the time for the two statements. Improving
 this boils down to playing with spark settings.

 Now consider the first block: I think this is a classic case of merge sort
 or a reduce tree. You already tried to improve this by submitting jobs in
 parallel using executor pool/Callable etc.

 To further improve the parallelization, I suggest you use a reduce tree
 like approach. For example, lets say you want to compute sum of all
 elements of an array in parallel. The way its solved for a GPU like
 platform is you divide your input array initially in chunks of 2, compute
 those n/2 sums parallely on separate threads and save the result in the
 first of the two elements. In the next iteration, you compute n/4 sums
 parallely of the earlier sums and so on till you are left with only two
 elements whose sum gives you final sum.

 You are performing many sequential unionAll operations for inputs.size()
 avro files. Assuming the unionAll() on DataFrame is blocking (and not a
 simple transformation like on RDDs) and actually performs the union
 operation, you will certainly benefit by parallelizing this loop. You may
 change the loop to something like below:

 // pseudo code only
 int n = inputs.size()
 // initialize executor
 executor = new FixedThreadPoolExecutor(n/2)
 dfInput = new DataFrame[n/2]
 for(int i =0;i  n/2;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace with
 dfInput(i) in your code
 dfInput[i] = sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro).unionAll(sqlContext.load(inputsPath.get(i +
 n/2), com.databricks.spark.avro))
 }
 });
 }

 executor.awaitTermination(0, TimeUnit.SECONDS)

 int steps = log(n)/log(2.0)
 for(s = 2; s  steps;s++) {
 int stride = n/(1  s); // n/(2^s)
 for(int i = 0;i  stride;i++) {
 executor.submit(new Runnable() {
 public void run() {
 // union of i and i+n/2
 // showing [] only to bring out array access. Replace with
 dfInput(i) and dfInput(i+stride) in your code
 dfInput[i] = dfInput[i].unionAll(dfInput[i + stride])
 }
 });
 }
 executor.awaitTermination(0, TimeUnit.SECONDS)
 }

 Let me know if it helped.

 -Kiran


 On Thu, Jun 4, 2015 at 8:57 PM, James Aley james.a...@swiftkey.com
 wrote:

 Thanks for the confirmation! We're quite new to Spark, so a little
 reassurance is a good thing to have sometimes :-)

 The thing that's concerning me at the moment is that my job doesn't seem
 to run any faster with more compute resources added to the cluster, and
 this is proving a little tricky to debug. There are a lot of variables, so
 here's what we've tried already and the apparent impact. If anyone has any
 further suggestions, we'd love to hear!

 * Increase the minimum number of output files (targetPartitions above),
 so that input groups smaller than our minimum chunk size can still be
 worked on by more than one executor. This does measurably speed things up,
 but obviously it's a trade-off, as the original goal for this job is to
 merge our data into fewer, larger files.

 * Submit many jobs in parallel, by running the above code in a Callable,
 on an executor pool. This seems to help, to some extent, but I'm not sure
 what else needs to be configured alongside it -- driver threads, scheduling
 policy, etc. We set scheduling to FAIR when doing this, as that seemed
 like the right approach, but we're not 100% confident. It seemed to help
 quite substantially anyway, so perhaps this just needs further tuning?

 * Increasing executors, RAM, etc. This doesn't make a difference by
 itself for this job, so I'm thinking we're already not fully utilising the
 resources we have in a smaller cluster.

 Again, any recommendations appreciated. Thanks for the help!


 James.

 On 4 June 2015 at 

Re: Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread James Aley
Thanks for the confirmation! We're quite new to Spark, so a little
reassurance is a good thing to have sometimes :-)

The thing that's concerning me at the moment is that my job doesn't seem to
run any faster with more compute resources added to the cluster, and this
is proving a little tricky to debug. There are a lot of variables, so
here's what we've tried already and the apparent impact. If anyone has any
further suggestions, we'd love to hear!

* Increase the minimum number of output files (targetPartitions above),
so that input groups smaller than our minimum chunk size can still be
worked on by more than one executor. This does measurably speed things up,
but obviously it's a trade-off, as the original goal for this job is to
merge our data into fewer, larger files.

* Submit many jobs in parallel, by running the above code in a Callable, on
an executor pool. This seems to help, to some extent, but I'm not sure what
else needs to be configured alongside it -- driver threads, scheduling
policy, etc. We set scheduling to FAIR when doing this, as that seemed
like the right approach, but we're not 100% confident. It seemed to help
quite substantially anyway, so perhaps this just needs further tuning?

* Increasing executors, RAM, etc. This doesn't make a difference by itself
for this job, so I'm thinking we're already not fully utilising the
resources we have in a smaller cluster.

Again, any recommendations appreciated. Thanks for the help!


James.

On 4 June 2015 at 15:00, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi

 2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com:

 Hi,

 We have a load of Avro data coming into our data systems in the form of
 relatively small files, which we're merging into larger Parquet files with
 Spark. I've been following the docs and the approach I'm taking seemed
 fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
 not the most optimal approach.

 I was wondering if anyone on this list might have some advice to make to
 make this job as efficient as possible. Here's some code:

 DataFrame dfInput = sqlContext.load(inputPaths.get(0),
 com.databricks.spark.avro);
 long totalSize = getDirSize(inputPaths.get(0));

 for (int i = 1; i  inputs.size(); ++i) {
 dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro));
 totalSize += getDirSize(inputPaths.get(i));
 }

 int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
 DataFrame outputFrame;

 // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
 // the synchronize block below. Suggestions welcome here too! :-)
 synchronized (this) {
 RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
 null);
 outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
 }

 outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

 Here are some things bothering me:

- Conversion to an RDD and back again so that we can use coalesce()
to reduce the number of partitions. This is because we read that
repartition() is not as efficient as coalesce(), and local micro 
 benchmarks
seemed to somewhat confirm that this was faster. Is this really a good 
 idea
though? Should we be doing something else?

 Repartition uses coalesce but with a forced shuffle step. Its just a
 shortcut for coalesce(xxx, true)
 Doing a coalesce sounds correct, I'd do the same :) Note that if you add
 the shuffle step, then your partitions should be better balanced.


- Usage of unionAll() - this is the only way I could find to join the
separate data sets into a single data frame to save as Parquet. Is there a
better way?

 When using directly the inputformats you can do this
 FileInputFormat.addInputPath, it should perform at least as good as union.


- Do I need to be using the DataFrame API at all? I'm not querying
any data here, so the nice API for SQL-like transformations of the data
isn't being used. The DataFrame API just seemed like the path of least
resistance for working with Avro and Parquet. Would there be any advantage
to using hadoopRDD() with the appropriate Input/Output formats?



 Using directly the input/outputformats sounds viable. But the snippet you
 show seems clean enough and I am not sure there would be much value in
 making something (maybe) slightly faster but harder to understand.


 Eugen

 Any advice or tips greatly appreciated!


 James.






Re: Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread Eugen Cepoi
Hi

2015-06-04 15:29 GMT+02:00 James Aley james.a...@swiftkey.com:

 Hi,

 We have a load of Avro data coming into our data systems in the form of
 relatively small files, which we're merging into larger Parquet files with
 Spark. I've been following the docs and the approach I'm taking seemed
 fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
 not the most optimal approach.

 I was wondering if anyone on this list might have some advice to make to
 make this job as efficient as possible. Here's some code:

 DataFrame dfInput = sqlContext.load(inputPaths.get(0),
 com.databricks.spark.avro);
 long totalSize = getDirSize(inputPaths.get(0));

 for (int i = 1; i  inputs.size(); ++i) {
 dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
 com.databricks.spark.avro));
 totalSize += getDirSize(inputPaths.get(i));
 }

 int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
 DataFrame outputFrame;

 // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
 // the synchronize block below. Suggestions welcome here too! :-)
 synchronized (this) {
 RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
 null);
 outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
 }

 outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

 Here are some things bothering me:

- Conversion to an RDD and back again so that we can use coalesce() to
reduce the number of partitions. This is because we read that repartition()
is not as efficient as coalesce(), and local micro benchmarks seemed to
somewhat confirm that this was faster. Is this really a good idea though?
Should we be doing something else?

 Repartition uses coalesce but with a forced shuffle step. Its just a
shortcut for coalesce(xxx, true)
Doing a coalesce sounds correct, I'd do the same :) Note that if you add
the shuffle step, then your partitions should be better balanced.


- Usage of unionAll() - this is the only way I could find to join the
separate data sets into a single data frame to save as Parquet. Is there a
better way?

 When using directly the inputformats you can do this
FileInputFormat.addInputPath, it should perform at least as good as union.


- Do I need to be using the DataFrame API at all? I'm not querying any
data here, so the nice API for SQL-like transformations of the data isn't
being used. The DataFrame API just seemed like the path of least resistance
for working with Avro and Parquet. Would there be any advantage to using
hadoopRDD() with the appropriate Input/Output formats?



Using directly the input/outputformats sounds viable. But the snippet you
show seems clean enough and I am not sure there would be much value in
making something (maybe) slightly faster but harder to understand.


Eugen

Any advice or tips greatly appreciated!


 James.





Optimisation advice for Avro-Parquet merge job

2015-06-04 Thread James Aley
Hi,

We have a load of Avro data coming into our data systems in the form of
relatively small files, which we're merging into larger Parquet files with
Spark. I've been following the docs and the approach I'm taking seemed
fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
not the most optimal approach.

I was wondering if anyone on this list might have some advice to make to
make this job as efficient as possible. Here's some code:

DataFrame dfInput = sqlContext.load(inputPaths.get(0),
com.databricks.spark.avro);
long totalSize = getDirSize(inputPaths.get(0));

for (int i = 1; i  inputs.size(); ++i) {
dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
com.databricks.spark.avro));
totalSize += getDirSize(inputPaths.get(i));
}

int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
DataFrame outputFrame;

// Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
// the synchronize block below. Suggestions welcome here too! :-)
synchronized (this) {
RDDRow inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
null);
outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
}

outputFrame.save(outputPath, parquet, SaveMode.Overwrite);

Here are some things bothering me:

   - Conversion to an RDD and back again so that we can use coalesce() to
   reduce the number of partitions. This is because we read that repartition()
   is not as efficient as coalesce(), and local micro benchmarks seemed to
   somewhat confirm that this was faster. Is this really a good idea though?
   Should we be doing something else?
   - Usage of unionAll() - this is the only way I could find to join the
   separate data sets into a single data frame to save as Parquet. Is there a
   better way?
   - Do I need to be using the DataFrame API at all? I'm not querying any
   data here, so the nice API for SQL-like transformations of the data isn't
   being used. The DataFrame API just seemed like the path of least resistance
   for working with Avro and Parquet. Would there be any advantage to using
   hadoopRDD() with the appropriate Input/Output formats?


Any advice or tips greatly appreciated!


James.