Thanks for the confirmation! We're quite new to Spark, so a little
reassurance is a good thing to have sometimes :-)

The thing that's concerning me at the moment is that my job doesn't seem to
run any faster with more compute resources added to the cluster, and this
is proving a little tricky to debug. There are a lot of variables, so
here's what we've tried already and the apparent impact. If anyone has any
further suggestions, we'd love to hear!

* Increase the "minimum" number of output files (targetPartitions above),
so that input groups smaller than our minimum chunk size can still be
worked on by more than one executor. This does measurably speed things up,
but obviously it's a trade-off, as the original goal for this job is to
merge our data into fewer, larger files.

* Submit many jobs in parallel, by running the above code in a Callable, on
an executor pool. This seems to help, to some extent, but I'm not sure what
else needs to be configured alongside it -- driver threads, scheduling
policy, etc. We set scheduling to "FAIR" when doing this, as that seemed
like the right approach, but we're not 100% confident. It seemed to help
quite substantially anyway, so perhaps this just needs further tuning?

* Increasing executors, RAM, etc. This doesn't make a difference by itself
for this job, so I'm thinking we're already not fully utilising the
resources we have in a smaller cluster.

Again, any recommendations appreciated. Thanks for the help!


James.

On 4 June 2015 at 15:00, Eugen Cepoi <cepoi.eu...@gmail.com> wrote:

> Hi
>
> 2015-06-04 15:29 GMT+02:00 James Aley <james.a...@swiftkey.com>:
>
>> Hi,
>>
>> We have a load of Avro data coming into our data systems in the form of
>> relatively small files, which we're merging into larger Parquet files with
>> Spark. I've been following the docs and the approach I'm taking seemed
>> fairly obvious, and pleasingly simple, but I'm wondering if perhaps it's
>> not the most optimal approach.
>>
>> I was wondering if anyone on this list might have some advice to make to
>> make this job as efficient as possible. Here's some code:
>>
>> DataFrame dfInput = sqlContext.load(inputPaths.get(0),
>> "com.databricks.spark.avro");
>> long totalSize = getDirSize(inputPaths.get(0));
>>
>> for (int i = 1; i < inputs.size(); ++i) {
>>     dfInput = dfInput.unionAll(sqlContext.load(inputPaths.get(i),
>> "com.databricks.spark.avro"));
>>     totalSize += getDirSize(inputPaths.get(i));
>> }
>>
>> int targetPartitions = (int) Math.max(2L, totalSize / TARGET_SIZE_BYTES);
>> DataFrame outputFrame;
>>
>> // Note: HADOOP-10456 impacts us, as we're stuck on 2.4.0 in EMR, hence
>> // the synchronize block below. Suggestions welcome here too! :-)
>> synchronized (this) {
>>     RDD<Row> inputRdd = dfInput.rdd().coalesce(targetPartitions, false,
>> null);
>>     outputFrame = sqlContext.createDataFrame(inputRdd, dfInput.schema());
>> }
>>
>> outputFrame.save(outputPath, "parquet", SaveMode.Overwrite);
>>
>> Here are some things bothering me:
>>
>>    - Conversion to an RDD and back again so that we can use coalesce()
>>    to reduce the number of partitions. This is because we read that
>>    repartition() is not as efficient as coalesce(), and local micro 
>> benchmarks
>>    seemed to somewhat confirm that this was faster. Is this really a good 
>> idea
>>    though? Should we be doing something else?
>>
>> Repartition uses coalesce but with a forced shuffle step. Its just a
> shortcut for coalesce(xxx, true)
> Doing a coalesce sounds correct, I'd do the same :) Note that if you add
> the shuffle step, then your partitions should be better balanced.
>
>>
>>    - Usage of unionAll() - this is the only way I could find to join the
>>    separate data sets into a single data frame to save as Parquet. Is there a
>>    better way?
>>
>> When using directly the inputformats you can do this
> FileInputFormat.addInputPath, it should perform at least as good as union.
>
>>
>>    - Do I need to be using the DataFrame API at all? I'm not querying
>>    any data here, so the nice API for SQL-like transformations of the data
>>    isn't being used. The DataFrame API just seemed like the path of least
>>    resistance for working with Avro and Parquet. Would there be any advantage
>>    to using hadoopRDD() with the appropriate Input/Output formats?
>>
>>
>>
> Using directly the input/outputformats sounds viable. But the snippet you
> show seems clean enough and I am not sure there would be much value in
> making something (maybe) slightly faster but harder to understand.
>
>
> Eugen
>
> Any advice or tips greatly appreciated!
>>
>>
>> James.
>>
>>
>>
>

Reply via email to