Does DataSet/DataFrame support ReduceBy() as RDD does?

2019-06-24 Thread Qian He
i found RDD.reduceBy() is really useful and much more efficient than
groupBy(). Wondering if DS/DF have the similar apis?


dynamic allocation in spark-shell

2019-05-30 Thread Qian He
Sometimes it's convenient to start a spark-shell on cluster, like
./spark/bin/spark-shell --master yarn --deploy-mode client --num-executors
100 --executor-memory 15g --executor-cores 4 --driver-memory 10g --queue
myqueue
However, with command like this, those allocated resources will be occupied
until the console exits.

Just wandering if it is possible to start a spark-shell with
dynamicAllocation enabled? If it is, how to specify the configs? Can anyone
give an quick example? Thanks!


Re: What is the difference for the following UDFs?

2019-05-14 Thread Qian He
Hi Jacek,

Thanks for your reply. Your provided case was actually same as my second
option in my original email. What I'm wondering was the difference between
those two regarding query performance or efficiency.

On Tue, May 14, 2019 at 3:51 PM Jacek Laskowski  wrote:

> Hi,
>
> For this particular case I'd use Column.substr (
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column),
> e.g.
>
> val ns = Seq(("hello world", 1, 5)).toDF("w", "b", "e")
> scala> ns.select($"w".substr($"b", $"e" - $"b" + 1) as "demo").show
> +-+
> | demo|
> +-+
> |hello|
> +-+
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, May 14, 2019 at 5:08 PM Qian He  wrote:
>
>> For example, I have a dataframe with 3 columns: URL, START, END. For each
>> url from URL column, I want to fetch a substring of it starting from START
>> and ending at END.
>> ++--+-+
>> |URL|START |END |
>> ++--+-+
>> |www.amazon.com  |4  |14 |
>> |www.yahoo.com |4  |13 |
>> |www.amazon.com  |4  |14 |
>> |www.google.com|4  |14 |
>>
>> I have UDF1:
>>
>> def getSubString = (input: String, start: Int, end: Int) => {
>>input.substring(start, end)
>> }
>> val udf1 = udf(getSubString)
>>
>> and another UDF2:
>>
>> def getColSubString()(c1: Column, c2: Column, c3: Column): Column = {
>>c1.substr(c2, c3-c2)
>> }
>>
>> Let's assume they can both generate the result I want. But, from performance 
>> perspective, is there any difference between those two UDFs?
>>
>>
>>


What is the difference for the following UDFs?

2019-05-14 Thread Qian He
For example, I have a dataframe with 3 columns: URL, START, END. For each
url from URL column, I want to fetch a substring of it starting from START
and ending at END.
++--+-+
|URL|START |END |
++--+-+
|www.amazon.com  |4  |14 |
|www.yahoo.com |4  |13 |
|www.amazon.com  |4  |14 |
|www.google.com|4  |14 |

I have UDF1:

def getSubString = (input: String, start: Int, end: Int) => {
   input.substring(start, end)
}
val udf1 = udf(getSubString)

and another UDF2:

def getColSubString()(c1: Column, c2: Column, c3: Column): Column = {
   c1.substr(c2, c3-c2)
}

Let's assume they can both generate the result I want. But, from
performance perspective, is there any difference between those two
UDFs?


Train ML models on each partition

2019-05-08 Thread Qian He
I have a 1TB dataset with 100 columns. The first column is a user_id, there
are about 1000 unique user_ids in this 1TB dataset.

The use case: I want to train a ML model for each user_id on this user's
records (approximately 1GB records per user). Say the ML model is a
Decision Tree. But it is not feasible to create 1000 Spark applications to
achieve this. Can I launch just one Spark application and accomplish the
trainings of these 1000 DT models? How?

Can I just partition the 1TB data by user_id, and then train model for each
partition?

Thanks!


Re: Spark LogisticRegression got stuck on dataset with millions of columns

2019-04-23 Thread Qian He
The dataset was using a sparse representation before feeding into
LogisticRegression.

On Tue, Apr 23, 2019 at 3:15 PM Weichen Xu 
wrote:

> Hi Qian,
>
> Do your dataset use sparse vector format ?
>
>
>
> On Mon, Apr 22, 2019 at 5:03 PM Qian He  wrote:
>
>> Hi all,
>>
>> I'm using Spark provided LogisticRegression to fit a dataset. Each row of
>> the data has 1.7 million columns, but it is sparse with only hundreds of
>> 1s. The Spark Ui reported high GC time when the model is being trained. And
>> my spark application got stuck without any response. I have allocated 100
>> executors and 8g for each executor.
>>
>> Is there any thing i should do to make the training process go
>> successfully?
>>
>


Spark LogisticRegression got stuck on dataset with millions of columns

2019-04-22 Thread Qian He
Hi all,

I'm using Spark provided LogisticRegression to fit a dataset. Each row of
the data has 1.7 million columns, but it is sparse with only hundreds of
1s. The Spark Ui reported high GC time when the model is being trained. And
my spark application got stuck without any response. I have allocated 100
executors and 8g for each executor.

Is there any thing i should do to make the training process go successfully?


subscribe

2019-03-04 Thread Qian He