It does not look like scala vs python thing. How big is your audience data
store? Can it be broadcasted?

What is the memory footprint you are seeing? At what point yarn is killing?
Depeneding on that you may want to tweak around number of partitions of
input dataset and increase number of executors

Ayan


On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <pmccar...@dstillery.com>
wrote:

> Hello,
>
> I'm trying to build an ETL job which takes in 30-100gb of text data and
> prepares it for SparkML. I don't speak Scala so I've been trying to
> implement in PySpark on YARN, Spark 2.1.
>
> Despite the transformations being fairly simple, the job always fails by
> running out of executor memory.
>
> The input table is long (~6bn rows) but composed of three simple values:
>
> #####################################################################
> all_data_long.printSchema()
>
> root
> |-- id: long (nullable = true)
> |-- label: short (nullable = true)
> |-- segment: string (nullable = true)
>
> #####################################################################
>
> First I join it to a table of particular segments of interests and do an
> aggregation,
>
> #####################################################################
>
> audiences.printSchema()
>
> root
>  |-- entry: integer (nullable = true)
>  |-- descr: string (nullable = true)
>
>
> print("Num in adl: {}".format(str(all_data_long.count())))
>
> aud_str = audiences.select(audiences['entry'].cast('string'),
>         audiences['descr'])
>
> alldata_aud = all_data_long.join(aud_str,
>         all_data_long['segment']==aud_str['entry'],
>         'left_outer')
>
> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>
> idx_df   = str_idx.fit(alldata_aud)
> label_df =
> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')
>
> id_seg = (label_df
>         .filter(label_df.descr.isNotNull())
>         .groupBy('id')
>         .agg(collect_list('descr')))
>
> id_seg.write.saveAsTable("hive.id_seg")
>
> #####################################################################
>
> Then, I use that StringIndexer again on the first data frame to featurize
> the segment ID
>
> #####################################################################
>
> alldat_idx =
> idx_df.transform(all_data_long).withColumnRenamed('label','label_val')
>
> #####################################################################
>
>
> My ultimate goal is to make a SparseVector, so I group the indexed
> segments by id and try to cast it into a vector
>
> #####################################################################
>
> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, {v:1.0
> for v in l}),VectorUDT())
>
> alldat_idx.cache()
>
> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>
> print("alldat_dix: {}".format(str(alldat_idx.count())))
>
> feature_df = (alldat_idx
>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>         .groupBy('id','label')
>
> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>         .withColumn('num_feat',lit(feature_vec_len))
>
> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>         .drop('collect_list_is')
>         .drop('num_feat'))
>
> feature_df.cache()
> print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
> failure occurs here
>
> #####################################################################
>
> Here, however, I always run out of memory on the executors (I've twiddled
> driver and executor memory to check) and YARN kills off my containers. I've
> gone as high as —executor-memory 15g but it still doesn't help.
>
> Given the number of segments is at most 50,000 I'm surprised that a
> smallish row-wise operation is enough to blow up the process.
>
>
> Is it really the UDF that's killing me? Do I have to rewrite it in Scala?
>
>
>
>
>
> Query plans for the failing stage:
>
> #####################################################################
>
>
> == Parsed Logical Plan ==
> Aggregate [count(1) AS count#265L]
> +- Project [id#0L, label#183, features#208]
>    +- Project [id#0L, label#183, num_feat#202, features#208]
>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
> num_feat#202]
>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>                +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
> indexedSegs#93]
>                      +- Project [id#0L, label#1, segment#2,
> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>                         +- MetastoreRelation pmccarthy, all_data_long
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#265L]
> +- Project [id#0L, label#183, features#208]
>    +- Project [id#0L, label#183, num_feat#202, features#208]
>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
> num_feat#202]
>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS collect_list_is#197]
>                +- Project [id#0L, label_val#99, segment#2, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>                   +- Project [id#0L, label#1 AS label_val#99, segment#2,
> indexedSegs#93]
>                      +- Project [id#0L, label#1, segment#2,
> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>                         +- MetastoreRelation pmccarthy, all_data_long
>
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#265L]
> +- Project
>    +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
> StorageLevel(disk, memory, deserialized, 1 replicas)
>          +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
>             +- BatchEvalPython [<lambda>(collect_list_is#197, 56845.0)],
> [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>                +- SortAggregate(key=[id#0L, label#183],
> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
> collect_list_is#197])
>                   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
> FIRST], false, 0
>                      +- Exchange hashpartitioning(id#0L, label#183, 200)
>                         +- *Project [id#0L, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>                            +- InMemoryTableScan [id#0L, indexedSegs#93,
> label_val#99]
>                                  +- InMemoryRelation [id#0L, label_val#99,
> segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>                                        +- *Project [id#0L, label#1 AS
> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>                                           +- HiveTableScan [id#0L,
> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
> +- Exchange SinglePartition
>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#284L])
>       +- InMemoryTableScan
>             +- InMemoryRelation [id#0L, label#183, features#208], true,
> 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>                   +- *Project [id#0L, label#183, pythonUDF0#244 AS
> features#208]
>                      +- BatchEvalPython [<lambda>(collect_list_is#197,
> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>                         +- SortAggregate(key=[id#0L, label#183],
> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
> collect_list_is#197])
>                            +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC
> NULLS FIRST], false, 0
>                               +- Exchange hashpartitioning(id#0L,
> label#183, 200)
>                                  +- *Project [id#0L, indexedSegs#93,
> cast(label_val#99 as double) AS label#183]
>                                     +- InMemoryTableScan [id#0L,
> indexedSegs#93, label_val#99]
>                                           +- InMemoryRelation [id#0L,
> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
>                                                 +- *Project [id#0L,
> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>                                                    +- HiveTableScan
> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>
>
> --
Best Regards,
Ayan Guha

Reply via email to