How to increase the parallelism of Spark Streaming application?

2018-11-06 Thread JF Chen
I have a Spark Streaming application which reads data from kafka and save
the the transformation result to hdfs.
My original partition number of kafka topic is 8, and repartition the data
to 100 to increase the parallelism of spark job.
Now I am wondering if I increase the kafka partition number to 100 instead
of setting repartition to 100, will the performance be enhanced? (I know
repartition action cost a lot cpu resource)
If I set the kafka partition number to 100, does it have any negative
efficiency?
I just have one production environment so it's not convenient for me to do
the test

Thanks!

Regard,
Junfeng Chen


SPARK-25959 - Difference in featureImportances results on computed vs saved models

2018-11-06 Thread Suraj Nayak
Hi Spark Users,

I tried to implement GBT and found that the feature Importance computed
while the model was fit is different when the same model was saved into a
storage and loaded back.



I also found that once the persistent model is loaded and saved back again
and loaded, the feature importance remains the same.



Not sure if its bug while storing and reading the model first time or am
missing some parameter that need to be set before saving the model (thus
model is picking some defaults - causing feature importance to change)



*Below is the test code:*

val testDF = Seq(
(1, 3, 2, 1, 1),
(3, 2, 1, 2, 0),
(2, 2, 1, 1, 0),
(3, 4, 2, 2, 0),
(2, 2, 1, 3, 1)
).toDF("a", "b", "c", "d", "e")


val featureColumns = testDF.columns.filter(_ != "e")
// Assemble the features into a vector
val assembler = new VectorAssembler().setInputCols
(featureColumns).setOutputCol("features")
// Transform the data to get the feature data set
val featureDF = assembler.transform(testDF)

// Train a GBT model.
val gbt = new GBTClassifier()
.setLabelCol("e")
.setFeaturesCol("features")
.setMaxDepth(2)
.setMaxBins(5)
.setMaxIter(10)
.setSeed(10)
.fit(featureDF)

gbt.transform(featureDF).show(false)

// Write out the model

featureColumns.zip(gbt.featureImportances.toArray).sortBy(-_
._2).take(20).foreach(println)
/* Prints

(d,0.5931875075767403)
(a,0.3747184548362353)
(b,0.03209403758702444)
(c,0.0)

*/
gbt.write.overwrite().save("file:///tmp/test123")

println("Reading model again")
val gbtload = GBTClassificationModel.load("file:///tmp/test123")

featureColumns.zip(gbtload.featureImportances.toArray).sortB
y(-_._2).take(20).foreach(println)

/*

Prints

(d,0.6455841215290767)
(a,0.3316126797964181)
(b,0.022803198674505094)
(c,0.0)

*/


gbtload.write.overwrite().save("file:///tmp/test123_rewrite")

val gbtload2 = GBTClassificationModel.load("file:///tmp/test123_rewrite")

featureColumns.zip(gbtload2.featureImportances.toArray).sort
By(-_._2).take(20).foreach(println)

/* prints
(d,0.6455841215290767)
(a,0.3316126797964181)
(b,0.022803198674505094)
(c,0.0)

*/

Any help is appreciated in making sure the feature importance is
maintenaned as is while the model is first stored.

Thanks!


Re: Shuffle write explosion

2018-11-06 Thread Yichen Zhou
Hi Dillon,

Thank you for your reply.
mapToPair use a PairFunction to transform format to a particular parquet
format. I have tried to replace the mapToPair() function with other action
operators like count() or collect(), but it didn't work. So I guess the
shuffle write explosion problem have no concern with mapToPair().

Best Regrads,
Yichen

Dillon Dukek  于2018年11月6日周二 上午7:21写道:

> What is your function in mapToPair doing?
>
> -Dillon
>
> On Mon, Nov 5, 2018 at 1:41 PM Taylor Cox 
> wrote:
>
>> At first glance, I wonder if your tables are partitioned? There may not
>> be enough parallelism happening. You can also pass in the number of
>> partitions and/or a custom partitioner to help Spark “guess” how to
>> organize the shuffle.
>>
>>
>>
>> Have you seen any of these docs?
>>
>>
>> https://people.eecs.berkeley.edu/~kubitron/courses/cs262a-F13/projects/reports/project16_report.pdf
>>
>> https://spark.apache.org/docs/latest/tuning.html
>>
>>
>>
>> Taylor
>>
>>
>>
>>
>>
>> *From:* Yichen Zhou 
>> *Sent:* Sunday, November 4, 2018 11:42 PM
>> *To:* user@spark.apache.org
>> *Subject:* Shuffle write explosion
>>
>>
>>
>> Hi All,
>>
>>
>>
>> When running a spark job, I have 100MB+ input and get more than 700GB
>> shuffle write, which is really weird. And this job finally end up with the
>> OOM error. Does anybody know why this happened?
>>
>> [image: Screen Shot 2018-11-05 at 15.20.35.png]
>>
>> My code is like:
>>
>> JavaPairRDD inputRDD = sc.sequenceFile(inputPath, Text.class,
>> Text.class);
>>
>>
>>  
>> inputRDD.repartition(partitionNum).mapToPair(...).saveAsNewAPIHadoopDataset(job.getConfiguration())
>> ;
>>
>>
>> Environment:
>>
>> *CPU 32 core; Memory 256G; Storage 7.5G CentOS 7.5*
>>
>> *java version "1.8.0_162"*
>>
>> *Spark 2.1.2*
>>
>>
>> Any help is greatly appreciated.
>>
>>
>>
>> Regards,
>>
>> Yichen
>>
>


Re: Spark 2.4.0 artifact in Maven repository

2018-11-06 Thread Bartosz Konieczny
Hi Matei,

Thanks for your answer, it's much clearer now. I was not aware about the
time needed for the release preparation.

Best regards,
Bartosz.

On Tue, Nov 6, 2018 at 9:05 AM Matei Zaharia 
wrote:

> Hi Bartosz,
>
> This is because the vote on 2.4 has passed (you can see the vote thread on
> the dev mailing list) and we are just working to get the release into
> various channels (Maven, PyPI, etc), which can take some time. Expect to
> see an announcement soon once that’s done.
>
> Matei
>
> > On Nov 4, 2018, at 7:14 AM, Bartosz Konieczny 
> wrote:
> >
> > Hi,
> >
> > Today I wanted to set up a development environment for GraphX and when I
> visited Maven central repository (
> https://mvnrepository.com/artifact/org.apache.spark/spark-graphx) I saw
> that it was already available in 2.4.0 version. Does it mean that the new
> version of Apache Spark was released ? It seems quite surprising for me
> because I didn't find any release information and the 2.4 artifact was
> deployed 29/10/2018. Maybe somebody here has some explanation for that ?
> >
> > Best regards,
> > Bartosz Konieczny.
>
>


Re: Spark 2.4.0 artifact in Maven repository

2018-11-06 Thread Matei Zaharia
Hi Bartosz,

This is because the vote on 2.4 has passed (you can see the vote thread on the 
dev mailing list) and we are just working to get the release into various 
channels (Maven, PyPI, etc), which can take some time. Expect to see an 
announcement soon once that’s done.

Matei

> On Nov 4, 2018, at 7:14 AM, Bartosz Konieczny  wrote:
> 
> Hi,
> 
> Today I wanted to set up a development environment for GraphX and when I 
> visited Maven central repository 
> (https://mvnrepository.com/artifact/org.apache.spark/spark-graphx) I saw that 
> it was already available in 2.4.0 version. Does it mean that the new version 
> of Apache Spark was released ? It seems quite surprising for me because I 
> didn't find any release information and the 2.4 artifact was deployed 
> 29/10/2018. Maybe somebody here has some explanation for that ?
> 
> Best regards,
> Bartosz Konieczny.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org