Re: Spark LogisticRegression got stuck on dataset with millions of columns

2019-04-23 Thread Weichen Xu
Could you provide your code, and running cluster info ?

On Tue, Apr 23, 2019 at 4:10 PM Qian He  wrote:

> The dataset was using a sparse representation before feeding into
> LogisticRegression.
>
> On Tue, Apr 23, 2019 at 3:15 PM Weichen Xu 
> wrote:
>
>> Hi Qian,
>>
>> Do your dataset use sparse vector format ?
>>
>>
>>
>> On Mon, Apr 22, 2019 at 5:03 PM Qian He  wrote:
>>
>>> Hi all,
>>>
>>> I'm using Spark provided LogisticRegression to fit a dataset. Each row
>>> of the data has 1.7 million columns, but it is sparse with only hundreds of
>>> 1s. The Spark Ui reported high GC time when the model is being trained. And
>>> my spark application got stuck without any response. I have allocated 100
>>> executors and 8g for each executor.
>>>
>>> Is there any thing i should do to make the training process go
>>> successfully?
>>>
>>


Re: Spark LogisticRegression got stuck on dataset with millions of columns

2019-04-23 Thread Weichen Xu
Hi Qian,

Do your dataset use sparse vector format ?



On Mon, Apr 22, 2019 at 5:03 PM Qian He  wrote:

> Hi all,
>
> I'm using Spark provided LogisticRegression to fit a dataset. Each row of
> the data has 1.7 million columns, but it is sparse with only hundreds of
> 1s. The Spark Ui reported high GC time when the model is being trained. And
> my spark application got stuck without any response. I have allocated 100
> executors and 8g for each executor.
>
> Is there any thing i should do to make the training process go
> successfully?
>


Re: Has there been any explanation on the performance degradation between spark.ml and Mllib?

2018-01-22 Thread Weichen Xu
Hi Stephen,

Agree with Nick said, the ML vs MLLib comparison test seems to be flawed.

LR in Spark MLLib use SGD, in each iteration during training, SGD only
sample a small fraction of data and do gradient computation, but in each
iteration LBFGS need to aggregate over the whole input dataset. So in each
iteration LBFGS will take a longer time, if dataset is large.

But LBFGS is a kind of quasi-Newton methods so that it converges faster
(nearly converges quadratically), but SGD method is linear convergence, and
we need to tune the step-size for SGD otherwise we may get very slow
convergence speed.

On Sun, Jan 21, 2018 at 11:31 PM, Nick Pentreath 
wrote:

> At least one of their comparisons is flawed.
>
> The Spark ML version of linear regression (*note* they use linear
> regression and not logistic regression, it is not clear why) uses L-BFGS as
> the solver, not SGD (as MLLIB uses). Hence it is typically going to be
> slower. However, it should in most cases converge to a better solution.
> MLLIB doesn't offer an L-BFGS version for linear regression, but it does
> for logistic regression.
>
> In my view a more sensible comparison would be between LogReg with L-BFGS
> between ML and MLLIB. These should be close to identical since now the
> MLLIB version actually wraps the ML version.
>
> They also don't show any results for algorithm performance (accuracy, AUC
> etc). The better comparison to make is the run-time to achieve the same AUC
> (for example). SGD may be fast, but it may result in a significantly poorer
> solution relative to say L-BFGS.
>
> Note that the "withSGD" algorithms are deprecated in MLLIB partly to move
> users to ML, but also partly because their performance in terms of accuracy
> is relatively poor and the amount of tuning required (e.g. learning rates)
> is high.
>
> They say:
>
> The time difference between Spark MLlib and Spark ML can be explained by
> internally transforming the dataset from DataFrame to RDD in order to use
> the same implementation of the algorithm present in MLlib.
>
> but this is not true for the LR example.
>
> For the feature selection example, it is probably mostly due to the
> conversion, but even then the difference seems larger than what I would
> expect. It would be worth investigating their implementation to see if
> there are other potential underlying causes.
>
>
> On Sun, 21 Jan 2018 at 23:49 Stephen Boesch  wrote:
>
>> While MLLib performed favorably vs Flink it *also *performed favorably
>> vs spark.ml ..  and by an *order of magnitude*.  The following is one of
>> the tables - it is for Logistic Regression.  At that time spark.ML did not
>> yet support SVM
>>
>> From: https://bdataanalytics.biomedcentral.com/articles/10.1186/
>> s41044-016-0020-2
>>
>>
>>
>> Table 3
>>
>> LR learning time in seconds
>>
>> Dataset
>>
>> Spark MLlib
>>
>> Spark ML
>>
>> Flink
>>
>> ECBDL14-10
>>
>> 3
>>
>> 26
>>
>> 181
>>
>> ECBDL14-30
>>
>> 5
>>
>> 63
>>
>> 815
>>
>> ECBDL14-50
>>
>> 6
>>
>> 173
>>
>> 1314
>>
>> ECBDL14-75
>>
>> 8
>>
>> 260
>>
>> 1878
>>
>> ECBDL14-100
>>
>> 12
>>
>> 415
>>
>> 2566
>>
>> The DataFrame based API (spark.ml) is even slower vs the RDD (mllib)
>> than had been anticipated - yet the latter has been shutdown for several
>> versions of Spark already.  What is the thought process behind that
>> decision : *performance matters! *Is there visibility into a meaningful
>> narrowing of that gap?
>>
>


Re: Please Help with DecisionTree/FeatureIndexer

2017-12-19 Thread Weichen Xu
Hi, Marco

Do not call any single fit/transform by your self. You only need to call
`pipeline.fit`/`pipelineModel.transform`. Like following:

val assembler = new VectorAssembler().
  setInputCols(inputData.columns.filter(_ != "Severity")).
  setOutputCol("features")

val data = assembler.transform(inputData)

val labelIndexer = new StringIndexer()
  .setInputCol("Severity")
  .setOutputCol("indexedLabel")

val featureIndexer =
  new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(5) // features with > 4 distinct values are treated
as continuous.

val Array(trainingData, testData) = data.randomSplit(Array(0.8, 0.2))
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")

// Convert indexed labels back to original labels.
  val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

// Chain indexers and tree in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(assembler, labelIndexer, featureIndexer, dt,
labelConverter))

trainingData.cache()
testData.cache()


// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)


Thanks.

On Wed, Dec 20, 2017 at 5:26 AM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Hello Weichen
>  i will try it out and le tyou know
> But, if i add assembler to the pipeline, do i still have to call
> Assembler.transform  and XXXIndexer.fit() ?
> kind regards
>  Marco
>
> On Tue, Dec 19, 2017 at 2:45 AM, Weichen Xu <weichen...@databricks.com>
> wrote:
>
>> Hi Marco,
>>
>> If you add assembler at the first of the pipeline, like:
>> ```
>>  val pipeline = new Pipeline()
>>   .setStages(Array(assembler, labelIndexer, featureIndexer, dt,
>> labelConverter))
>> ```
>>
>> Which error do you got ?
>>
>> I think it can work fine if the `assembler` added into pipeline.
>>
>> Thanks.
>>
>> On Tue, Dec 19, 2017 at 6:08 AM, Marco Mistroni <mmistr...@gmail.com>
>> wrote:
>>
>>> Hello Weichen
>>>  sorry to bother you again with my ML issue... but i feel you have more
>>> experience than i do in this and perhaps you can suggest me  if i am
>>> following the correct steps, as i seem to get confused by different
>>> examples on Decision Treees
>>>
>>> So, as a starting point i have this dataframe
>>>
>>> [BI-RADS, Age, Shape, Margin,Density,Severity]
>>>
>>> The label is 'Severity' and all others are features
>>> I am following these steps and i was wondering if you can advise if i am
>>> doing the correct thing , as i am unable to add the assembler at the
>>> beginning of the pipeilne, resorting instead to the following code
>>> 
>>>
>>> val assembler = new VectorAssembler().
>>>   setInputCols(inputData.columns.filter(_ != "Severity")).
>>>   setOutputCol("features")
>>>
>>> val data = assembler.transform(inputData)
>>>
>>> val labelIndexer = new StringIndexer()
>>>   .setInputCol("Severity")
>>>   .setOutputCol("indexedLabel")
>>>   .fit(data)
>>>
>>> val featureIndexer =
>>>   new VectorIndexer()
>>>   .setInputCol("features")
>>>   .setOutputCol("indexedFeatures")
>>>   .setMaxCategories(5) // features with > 4 distinct values are
>>> treated as continuous.
>>>   .fit(data)
>>>
>>> val Array(trainingData, testData) = data.randomSplit(Array(0.8, 0.2))
>>> // Train a DecisionTree model.
>>> val dt = new DecisionTreeClassifier()
>>>   .setLabelCol("indexedLabel")
>>>   .setFeaturesCol("indexedFeatures")
>>>
>>> // Convert indexed labels back to original labels.
>>>   val labelConverter = new IndexToString()
>>>   .setInputCol("prediction")
>>>   .setOutputCol("predictedLabel")
>>>   .setLabels(labelIndexer.labels)
>>>
>>> // Chain indexers and tree in a Pipeline.
>>> val pipeline = new Pipeline()
>>>   .setStages(Array(labelIndexer, featureIndexer, dt, label

Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-18 Thread Weichen Xu
Hi Sunitha,

In the mapper function, you cannot update outer variables such as
`personLst.add(person)`,
this won't work so that's the reason you got an empty list.

You can use `rdd.collect()` to get a local list of `Person` objects first,
then you can safely iterate on the local list and do any update you want.

Thanks.

On Tue, Dec 19, 2017 at 2:16 PM, Sunitha Chennareddy <
chennareddysuni...@gmail.com> wrote:

> Hi Deepak,
>
> I am able to map row to person class, issue is I want to to call another
> method.
> I tried converting to list and its not working with out using collect.
>
> Regards
> Sunitha
> On Tuesday, December 19, 2017, Deepak Sharma 
> wrote:
>
>> I am not sure about java but in scala it would be something like
>> df.rdd.map{ x => MyClass(x.getString(0),.)}
>>
>> HTH
>>
>> --Deepak
>>
>> On Dec 19, 2017 09:25, "Sunitha Chennareddy" > > wrote:
>>
>> Hi All,
>>
>> I am new to Spark, I want to convert DataFrame to List with
>> out using collect().
>>
>> Main requirement is I need to iterate through the rows of dataframe and
>> call another function by passing column value of each row (person.getId())
>>
>> Here is the snippet I have tried, Kindly help me to resolve the issue,
>> personLst is returning 0:
>>
>> List personLst= new ArrayList();
>> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
>> Function() {
>>   public Person call(Row row)  throws Exception{
>>   Person person = new Person();
>>   person.setId(row.getDecimal(0).longValue());
>>   person.setName(row.getString(1));
>>
>> personLst.add(person);
>> // here I tried to call another function but control never passed
>> return person;
>>   }
>> });
>> logger.info("personLst size =="+personLst.size());
>> logger.info("personRDD count ==="+personRDD.count());
>>
>> //output is
>> personLst size == 0
>> personRDD count === 3
>>
>>
>>


Re: Please Help with DecisionTree/FeatureIndexer

2017-12-18 Thread Weichen Xu
Hi Marco,

If you add assembler at the first of the pipeline, like:
```
 val pipeline = new Pipeline()
  .setStages(Array(assembler, labelIndexer, featureIndexer, dt,
labelConverter))
```

Which error do you got ?

I think it can work fine if the `assembler` added into pipeline.

Thanks.

On Tue, Dec 19, 2017 at 6:08 AM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Hello Weichen
>  sorry to bother you again with my ML issue... but i feel you have more
> experience than i do in this and perhaps you can suggest me  if i am
> following the correct steps, as i seem to get confused by different
> examples on Decision Treees
>
> So, as a starting point i have this dataframe
>
> [BI-RADS, Age, Shape, Margin,Density,Severity]
>
> The label is 'Severity' and all others are features
> I am following these steps and i was wondering if you can advise if i am
> doing the correct thing , as i am unable to add the assembler at the
> beginning of the pipeilne, resorting instead to the following code
> 
>
> val assembler = new VectorAssembler().
>   setInputCols(inputData.columns.filter(_ != "Severity")).
>   setOutputCol("features")
>
> val data = assembler.transform(inputData)
>
> val labelIndexer = new StringIndexer()
>   .setInputCol("Severity")
>   .setOutputCol("indexedLabel")
>   .fit(data)
>
> val featureIndexer =
>   new VectorIndexer()
>   .setInputCol("features")
>   .setOutputCol("indexedFeatures")
>   .setMaxCategories(5) // features with > 4 distinct values are
> treated as continuous.
>   .fit(data)
>
> val Array(trainingData, testData) = data.randomSplit(Array(0.8, 0.2))
> // Train a DecisionTree model.
> val dt = new DecisionTreeClassifier()
>   .setLabelCol("indexedLabel")
>   .setFeaturesCol("indexedFeatures")
>
> // Convert indexed labels back to original labels.
>   val labelConverter = new IndexToString()
>   .setInputCol("prediction")
>   .setOutputCol("predictedLabel")
>   .setLabels(labelIndexer.labels)
>
> // Chain indexers and tree in a Pipeline.
> val pipeline = new Pipeline()
>   .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
>
> trainingData.cache()
> testData.cache()
>
>
> // Train model. This also runs the indexers.
> val model = pipeline.fit(trainingData)
>
> // Make predictions.
> val predictions = model.transform(testData)
>
> // Select example rows to display.
> predictions.select("predictedLabel", "indexedLabel",
> "indexedFeatures").show(5)
>
> // Select (prediction, true label) and compute test error.
> val evaluator = new MulticlassClassificationEvaluator()
>   .setLabelCol("indexedLabel")
>   .setPredictionCol("prediction")
>   .setMetricName("accuracy")
> val accuracy = evaluator.evaluate(predictions)
> println("Test Error = " + (1.0 - accuracy))
>
> Could you advise if this is the proper way to follow when using an
> Assembler?
> I was unable to add the Assembler at the beginning of the pipeline... it
> seems it dint get invoked as , at the moment of calling the FeatureIndexer,
> the column 'features' was not found
>
> this is not urgent, i'll appreciate ifyou can give me your comments
> kind regards
>  marco
>
>
>
>
>
>
> On Sun, Dec 17, 2017 at 2:48 AM, Weichen Xu <weichen...@databricks.com>
> wrote:
>
>> Hi Marco,
>>
>> Yes you can apply `VectorAssembler` first in the pipeline to assemble
>> multiple features column.
>>
>> Thanks.
>>
>> On Sun, Dec 17, 2017 at 6:33 AM, Marco Mistroni <mmistr...@gmail.com>
>> wrote:
>>
>>> Hello Wei
>>>  Thanks, i should have c hecked the data
>>> My data has this format
>>> |col1|col2|col3|label|
>>>
>>> so it looks like i cannot use VectorIndexer directly (it accepts a
>>> Vector column).
>>> I am guessing what i should do is something like this (given i have few
>>> categorical features)
>>>
>>> val assembler = new VectorAssembler().
>>>   setInputCols(inputData.columns.filter(_ != "Label")).
>>>   setOutputCol("features")
>>>
>>> val transformedData = assembler.transform(inputData)
>>>
>>>
>>> val featureIndexer =
>>>   new VectorIndexer()
>>>   .setInputCol("features")
>>> 

Re: Please Help with DecisionTree/FeatureIndexer

2017-12-16 Thread Weichen Xu
Hi Marco,

Yes you can apply `VectorAssembler` first in the pipeline to assemble
multiple features column.

Thanks.

On Sun, Dec 17, 2017 at 6:33 AM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Hello Wei
>  Thanks, i should have c hecked the data
> My data has this format
> |col1|col2|col3|label|
>
> so it looks like i cannot use VectorIndexer directly (it accepts a Vector
> column).
> I am guessing what i should do is something like this (given i have few
> categorical features)
>
> val assembler = new VectorAssembler().
>   setInputCols(inputData.columns.filter(_ != "Label")).
>   setOutputCol("features")
>
> val transformedData = assembler.transform(inputData)
>
>
> val featureIndexer =
>   new VectorIndexer()
>   .setInputCol("features")
>   .setOutputCol("indexedFeatures")
>   .setMaxCategories(5) // features with > 4 distinct values are
> treated as continuous.
>   .fit(transformedData)
>
> ?
> Apologies for the basic question btu last time i worked on an ML project i
> was using Spark 1.x
>
> kr
>  marco
>
>
>
>
>
>
>
>
>
> On Dec 16, 2017 1:24 PM, "Weichen Xu" <weichen...@databricks.com> wrote:
>
>> Hi, Marco,
>>
>> val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_d
>> ata.txt")
>>
>> The data now include a feature column with name "features",
>>
>> val featureIndexer = new VectorIndexer()
>>   .setInputCol("features")   <-- Here specify the "features" column to 
>> index.
>>   .setOutputCol("indexedFeatures")
>>
>>
>> Thanks.
>>
>>
>> On Sat, Dec 16, 2017 at 6:26 AM, Marco Mistroni <mmistr...@gmail.com>
>> wrote:
>>
>>> HI all
>>>  i am trying to run a sample decision tree, following examples here (for
>>> Mllib)
>>>
>>> https://spark.apache.org/docs/latest/ml-classification-regre
>>> ssion.html#decision-tree-classifier
>>>
>>> the example seems to use  a Vectorindexer, however i am missing
>>> something.
>>> How does the featureIndexer knows which columns are features?
>>> Isnt' there something missing?  or the featuresIndexer is able to figure
>>> out by itself
>>> which columns of teh DAtaFrame are features?
>>>
>>> val labelIndexer = new StringIndexer()
>>>   .setInputCol("label")
>>>   .setOutputCol("indexedLabel")
>>>   .fit(data)// Automatically identify categorical features, and index 
>>> them.val featureIndexer = new VectorIndexer()
>>>   .setInputCol("features")
>>>   .setOutputCol("indexedFeatures")
>>>   .setMaxCategories(4) // features with > 4 distinct values are treated as 
>>> continuous.
>>>   .fit(data)
>>>
>>> Using this code i am getting back this exception
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Field 
>>> "features" does not exist.
>>> at 
>>> org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
>>> at 
>>> org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
>>> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>>> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
>>> at org.apache.spark.sql.types.StructType.apply(StructType.scala:265)
>>> at 
>>> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
>>> at 
>>> org.apache.spark.ml.feature.VectorIndexer.transformSchema(VectorIndexer.scala:141)
>>> at 
>>> org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
>>> at 
>>> org.apache.spark.ml.feature.VectorIndexer.fit(VectorIndexer.scala:118)
>>>
>>> what am i missing?
>>>
>>> w/kindest regarsd
>>>
>>>  marco
>>>
>>>
>>


Re: Please Help with DecisionTree/FeatureIndexer

2017-12-16 Thread Weichen Xu
Hi, Marco,

val data =
spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

The data now include a feature column with name "features",

val featureIndexer = new VectorIndexer()
  .setInputCol("features")   <-- Here specify the "features"
column to index.
  .setOutputCol("indexedFeatures")


Thanks.


On Sat, Dec 16, 2017 at 6:26 AM, Marco Mistroni  wrote:

> HI all
>  i am trying to run a sample decision tree, following examples here (for
> Mllib)
>
> https://spark.apache.org/docs/latest/ml-classification-
> regression.html#decision-tree-classifier
>
> the example seems to use  a Vectorindexer, however i am missing something.
> How does the featureIndexer knows which columns are features?
> Isnt' there something missing?  or the featuresIndexer is able to figure
> out by itself
> which columns of teh DAtaFrame are features?
>
> val labelIndexer = new StringIndexer()
>   .setInputCol("label")
>   .setOutputCol("indexedLabel")
>   .fit(data)// Automatically identify categorical features, and index 
> them.val featureIndexer = new VectorIndexer()
>   .setInputCol("features")
>   .setOutputCol("indexedFeatures")
>   .setMaxCategories(4) // features with > 4 distinct values are treated as 
> continuous.
>   .fit(data)
>
> Using this code i am getting back this exception
>
> Exception in thread "main" java.lang.IllegalArgumentException: Field 
> "features" does not exist.
> at 
> org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
> at 
> org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
> at org.apache.spark.sql.types.StructType.apply(StructType.scala:265)
> at 
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
> at 
> org.apache.spark.ml.feature.VectorIndexer.transformSchema(VectorIndexer.scala:141)
> at 
> org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
> at 
> org.apache.spark.ml.feature.VectorIndexer.fit(VectorIndexer.scala:118)
>
> what am i missing?
>
> w/kindest regarsd
>
>  marco
>
>


Re: Row Encoder For DataSet

2017-12-07 Thread Weichen Xu
You can groupBy multiple columns on dataframe, so why you need so
complicated schema ?

suppose df schema: (x, y, u, v, z)

df.groupBy($"x", $"y").agg(...)

Is this you want ?

On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta 
wrote:

> Hi,
>
> During my aggregation I end up having following schema.
>
> Row(Row(val1,val2), Row(val1,val2,val3...))
>
> val values = Seq(
> (Row(10, 11), Row(10, 2, 11)),
> (Row(10, 11), Row(10, 2, 11)),
> (Row(20, 11), Row(10, 2, 11))
>   )
>
>
> 1st tuple is used to group the relevant records for aggregation. I have
> used following to create dataset.
>
> val s = StructType(Seq(
>   StructField("x", IntegerType, true),
>   StructField("y", IntegerType, true)
> ))
> val s1 = StructType(Seq(
>   StructField("u", IntegerType, true),
>   StructField("v", IntegerType, true),
>   StructField("z", IntegerType, true)
> ))
>
> val ds = 
> sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
>  RowEncoder(s1)))
>
> Is this correct way of representing this?
>
> How do I create dataset and row encoder for such use case for doing
> groupByKey on this?
>
>
>
> Regards
> Sandeep
>


Re: StringIndexer on several columns in a DataFrame with Scala

2017-10-30 Thread Weichen Xu
Yes I am working on this. Sorry for late, but I will try to submit PR ASAP.
Thanks!

On Mon, Oct 30, 2017 at 5:19 PM, Nick Pentreath 
wrote:

> For now, you must follow this approach of constructing a pipeline
> consisting of a StringIndexer for each categorical column. See
> https://issues.apache.org/jira/browse/SPARK-11215 for the related JIRA to
> allow multiple columns for StringIndexer, which is being worked on
> currently.
>
> The reason you're seeing a NPE is:
>
> var indexers: Array[StringIndexer] = null
>
> and then you're trying to append an element to something that is null.
>
> Try this instead:
>
> var indexers: Array[StringIndexer] = Array()
>
>
> But even better is a more functional approach:
>
> val indexers = featureCol.map { colName =>
>
>   new StringIndexer().setInputCol(colName).setOutpucol(colName + "_indexed")
>
> }
>
>
> On Fri, 27 Oct 2017 at 22:29 Md. Rezaul Karim <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Hi All,
>>
>> There are several categorical columns in my dataset as follows:
>> [image: grafik.png]
>>
>> How can I transform values in each (categorical) columns into numeric
>> using StringIndexer so that the resulting DataFrame can be feed into
>> VectorAssembler to generate a feature vector?
>>
>> A naive approach that I can try using StringIndexer for each categorical
>> column. But that sounds hilarious, I know.
>> A possible workaround
>> in
>> PySpark is combining several StringIndexer on a list and use a Pipeline
>> to execute them all as follows:
>>
>> from pyspark.ml import Pipelinefrom pyspark.ml.feature import StringIndexer
>> indexers = [StringIndexer(inputCol=column, 
>> outputCol=column+"_index").fit(df) for column in 
>> list(set(df.columns)-set(['date'])) ]
>> pipeline = Pipeline(stages=indexers)
>> df_r = pipeline.fit(df).transform(df)
>> df_r.show()
>>
>> How I can do the same in Scala? I tried the following:
>>
>> val featureCol = trainingDF.columns
>> var indexers: Array[StringIndexer] = null
>>
>> for (colName <- featureCol) {
>>   val index = new StringIndexer()
>> .setInputCol(colName)
>> .setOutputCol(colName + "_indexed")
>> //.fit(trainDF)
>>   indexers = indexers :+ index
>> }
>>
>>  val pipeline = new Pipeline()
>> .setStages(indexers)
>> val newDF = pipeline.fit(trainingDF).transform(trainingDF)
>> newDF.show()
>>
>> However, I am experiencing NullPointerException at
>>
>> for (colName <- featureCol)
>>
>> I am sure, I am doing something wrong. Any suggestion?
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim*, BSc, MSc
>> Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> 
>>
>


Re: Zero Coefficient in logistic regression

2017-10-24 Thread Weichen Xu
Yes chi-squared statistic only used in categorical features. It looks not
proper here.
Thanks!

On Tue, Oct 24, 2017 at 5:13 PM, Simon Dirmeier 
wrote:

> Hey,
> as far as I know feature selection using the a chi-squared statistic, can
> only be done on categorical features and not on possibly continuous ones?
> Furthermore, since your logistic model doesn't use any regularization, you
> should be fine here. So I'd check the ChiSqSeletor and possibly replace it
> with another feature selection method.
>
> There is however always the chance that your response does not depend on
> your covariables, so you'd estimate a zero coefficient.
>
> Cheers,
> Simon
>
>
> Am 24.10.17 um 04:56 schrieb Alexis Peña:
>
> Hi Guys,
>
>
>
> We are fitting a Logistic model using the following code.
>
>
>
>
>
> val Chisqselector = new ChiSqSelector().setNumTopFeatures(10).
> setFeaturesCol("VECTOR_1").setLabelCol("TARGET").setOutputCol("
> selectedFeatures")
>
> val assembler = new VectorAssembler().setInputCols(Array("FEATURES",
> "selectedFeatures", "PROM_MESES_DIST", "RECENCIA", "TEMP_MIN", "TEMP_MAX",
> "PRECIPITACIONES")).setOutputCol("Union")
>
> val lr = new LogisticRegression().setLabelCol("TARGET").
> setFeaturesCol("Union")
>
> val pipeline = new Pipeline().setStages(Array(Chisqselector, assembler,
> lr))
>
>
>
>
>
> do you know why the coeff for  the following features are zero estimate,
> is it  produced in ChisqSelector or Logistic model?
>
>
>
> Thanks in advance!!
>
>
>
>
>
> CODIGO
>
> PARAMETRO
>
> COEFICIENTES_MUESTREO_BALANCEADO
>
> PROPIAS
>
> CV_UM
>
> 0,276866756
>
> PROPIAS
>
> CV_U3M
>
> -0,241851427
>
> PROPIAS
>
> CV_U6M
>
> -0,568312819
>
> PROPIAS
>
> CV_U12M
>
> 0,134706601
>
> PROPIAS
>
> M_UM
>
> 5,47E-06
>
> PROPIAS
>
> M_U3M
>
> -7,10E-06
>
> PROPIAS
>
> M_U6M
>
> 1,73E-05
>
> PROPIAS
>
> M_U12M
>
> -5,41E-06
>
> PROPIAS
>
> CP_UM
>
> -0,050750105
>
> PROPIAS
>
> CP_U3M
>
> 0,125483162
>
> PROPIAS
>
> CP_U6M
>
> -0,353906788
>
> PROPIAS
>
> CP_U12M
>
> 0,159538155
>
> PROPIAS
>
> TUM
>
> -0,020217902
>
> PROPIAS
>
> TU3M
>
> 0,002101906
>
> PROPIAS
>
> TU6M
>
> -0,005481915
>
> PROPIAS
>
> TU12M
>
> 0,003443081
>
> CRUZADAS
>
> 2303
>
> 0
>
> CRUZADAS
>
> 3901
>
> 0
>
> CRUZADAS
>
> 3905
>
> 0
>
> CRUZADAS
>
> 3907
>
> 0
>
> CRUZADAS
>
> 3909
>
> 0
>
> CRUZADAS
>
> 4102
>
> 0
>
> CRUZADAS
>
> 4307
>
> 0
>
> CRUZADAS
>
> 4501
>
> 0
>
> CRUZADAS
>
> 4907
>
> 0,247624087
>
> CRUZADAS
>
> 5304
>
> -0,161424508
>
> LP
>
> PROM_MESES_DIST
>
> -0,680356554
>
> PROPIAS
>
> RECENCIA
>
> -0,00289069
>
> EXTERNAS
>
> TEMP_MIN
>
> 0,006488683
>
> EXTERNAS
>
> TEMP_MAX
>
> -0,013497441
>
> EXTERNAS
>
> PRECIPITACIONES
>
> -0,007607086
>
> INTERCEPTO
>
> 2,401593191
>
>
>
>
>


Re: Spark ML - LogisticRegression interpreting prediction

2017-10-22 Thread Weichen Xu
The values you want to get (add up to 1.0) is "probability", not
"rawPrediction".

Thanks!

On Mon, Oct 23, 2017 at 1:20 AM, pun  wrote:

> Hello,
> I have a LogisticRegression model for predicting a binary label. Once I
> train the model, I run it to get some predictions. I get the following
> values for RawPrediction. How should I interpret these? Whdo they mean?
>
> ++
> |rawPrediction   |
> ++
> |[30.376879013053156,-30.376879013053156]|
> |[32.08591062636529,-32.08591062636529]  |
> |[34.67079346038218,-34.67079346038218]  |
>
> From scikit-learn, I believe, the two values for each user add up to 1.
> TIA
> --
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: jar file problem

2017-10-19 Thread Weichen Xu
Use `bin/spark-submit --jars` option.

On Thu, Oct 19, 2017 at 11:54 PM, 郭鹏飞  wrote:

> You can use bin/spark-submit tool to submit you jar to the cluster.
>
> > 在 2017年10月19日,下午11:24,Uğur Sopaoğlu  写道:
> >
> > Hello,
> >
> > I have a very easy problem. How I run a spark job, I must copy jar file
> to all worker nodes. Is there any way to do simple?.
> >
> > --
> > Uğur Sopaoğlu
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Weichen Xu
Hi Supun,

Dataframe API is NOT using the old RDD implementation under the covers,
dataframe has its own implementation. (Dataframe use binary row format and
columnar storage when cached). So dataframe has no relationship with the
`RDD[Row]` you want get.

When calling `df.rdd`, and then cache, it need to turn this dataframe into
rdd, it will extract each row from dataframe, unserialize them, and compose
the new RDD.

Thanks!

On Sat, Oct 14, 2017 at 6:17 AM, Stephen Boesch <java...@gmail.com> wrote:

> @Vadim   Would it be true to say the `.rdd` *may* be creating a new job -
> depending on whether the DataFrame/DataSet had already been materialized
> via an action or checkpoint?   If the only prior operations on the
> DataFrame had been transformations then the dataframe would still not have
> been calculated.  In that case would it also be true that a subsequent
> action/checkpoint on the DataFrame (not the rdd) would then generate a
> separate job?
>
> 2017-10-13 14:50 GMT-07:00 Vadim Semenov <vadim.seme...@datadoghq.com>:
>
>> When you do `Dataset.rdd` you actually create a new job
>>
>> here you can see what it does internally:
>> https://github.com/apache/spark/blob/master/sql/core/src/mai
>> n/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828
>>
>>
>>
>> On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala <
>> supun.nakand...@gmail.com> wrote:
>>
>>> Hi Weichen,
>>>
>>> Thank you for the reply.
>>>
>>> My understanding was Dataframe API is using the old RDD implementation
>>> under the covers though it presents a different API. And calling
>>> df.rdd will simply give access to the underlying RDD. Is this assumption
>>> wrong? I would appreciate if you can shed more insights on this issue or
>>> point me to documentation where I can learn them.
>>>
>>> Thank you in advance.
>>>
>>> On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu <weichen...@databricks.com>
>>> wrote:
>>>
>>>> You should use `df.cache()`
>>>> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from
>>>> the original `df`. and then cache the new RDD.
>>>>
>>>> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
>>>> supun.nakand...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have been experimenting with cache/persist/unpersist methods with
>>>>> respect to both Dataframes and RDD APIs. However, I am experiencing
>>>>> different behaviors Ddataframe API compared RDD API such Dataframes are 
>>>>> not
>>>>> getting cached when count() is called.
>>>>>
>>>>> Is there a difference between how these operations act wrt to
>>>>> Dataframe and RDD APIs?
>>>>>
>>>>> Thank You.
>>>>> -Supun
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Weichen Xu
You should use `df.cache()`
`df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from the
original `df`. and then cache the new RDD.

On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala 
wrote:

> Hi all,
>
> I have been experimenting with cache/persist/unpersist methods with
> respect to both Dataframes and RDD APIs. However, I am experiencing
> different behaviors Ddataframe API compared RDD API such Dataframes are not
> getting cached when count() is called.
>
> Is there a difference between how these operations act wrt to Dataframe
> and RDD APIs?
>
> Thank You.
> -Supun
>


Re: [MLlib] RowMatrix computeSVD Native ARPACK support not detecting.

2017-10-09 Thread Weichen Xu
Does you get the warning info such as:
`Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS`
`Failed to load implementation from:
com.github.fommil.netlib.NativeRefBLAS` ?

These two errors are thrown in `com.github.fommil.netlib.BLAS`, but it
catch the original exception so you cannot get more information.

I think you can use debugger to track the detail error info at here:
(attach source code of `com.github.fommil.netlib.BLAS` first)
```
public abstract class BLAS {

  private static final String FALLBACK = "com.github.fommil.netlib.F2jBLAS";
  private static final String IMPLS =
"com.github.fommil.netlib.NativeSystemBLAS,com.github.fommil.netlib.NativeRefBLAS,com.github.fommil.netlib.F2jBLAS";
  private static final String PROPERTY_KEY =
"com.github.fommil.netlib.BLAS";
  private static final BLAS INSTANCE;
  static {
try {
  String[] classNames = System.getProperty(PROPERTY_KEY,
IMPLS).split(",");
  BLAS impl = null;
  for (String className: classNames) {
try {
  impl = load(className);
  break;
} catch (Throwable e) {
  log.warning("Failed to load implementation from: " + className);
}
  }
  if (impl == null) {
log.warning("Using the fallback implementation.");
impl = load(FALLBACK);
  }
  INSTANCE = impl;
  log.config("Implementation provided by " + INSTANCE.getClass());
} catch (Exception e) {
  throw new ExceptionInInitializerError(e);
}
  }
  ...
```
You can add breakpoint at the red line, and use debugger to check
`e.toString()` value and you can get the accurate error message for why
loading native BLAS library failed.

Loading native BLAS library failed has many possibilities, depending on
different OS env.



On Mon, Oct 9, 2017 at 3:46 PM, Abdullah Bashir 
wrote:

> Hi,
>
>
> I am getting the following Warning when i run the pyspark job:
>
>
> My Code is
>
>
> mat = RowMatrix(tf_rdd_vec.cache())  # RDD is cached
>
> svd = mat.computeSVD(num_topics, computeU=False)
>
>
> I am using Ubuntu 16.04 EC2 instance. And I have installed all following
> libraries into my system.
>
>
> sudo apt install libarpack2 Arpack++ libatlas-base-dev liblapacke-dev
> libblas-dev gfortran libblas-dev liblapack-dev libnetlib-java libgfortran3
> libatlas3-base libopenblas-base
>
>
> Now when i list /usr/lib directory it shown me the .so files
>
>
> ubuntu:~$ ls /usr/lib/*.so | grep "pack\|blas"
>
> /usr/lib/libarpack.so
>
> /usr/lib/libblas.so
>
> /usr/lib/libcblas.so
>
> /usr/lib/libf77blas.so
>
> /usr/lib/liblapack_atlas.so
>
> /usr/lib/liblapacke.so
>
> /usr/lib/liblapack.so
>
> /usr/lib/libopenblasp-r0.2.18.so
>
> /usr/lib/libopenblas.so
>
> /usr/lib/libparpack.so
>
>
> I have adjusted LD_LIBRARY_PATH to point to above directory as well.
>
>
> export LD_LIBRARY_PATH=/var/lib/
>
>
> But Still I am not able to use the Native ARPACK implementation. Also I am
> Caching the RDD passing to matrix But it still throws Cache WARNING Any
> suggestion how to solve these 3 Warnings ?
>
>
> I have downloaded compiled version of spark-2.2.0 from the spark download
> page.
>
>
> *StackOverflow Link:* https://stackoverflow.com/questions/46612006/how-to-
> properly-setup-native-arpack-for-spark-2-2-0
>
>
> Best Regards,
>
>


Re: Example of GBTClassifier

2017-10-02 Thread Weichen Xu
It should be eclipses issues. The method is there, in super class
`Predictor`.

On Mon, Oct 2, 2017 at 11:51 PM, mckunkel  wrote:

> Greetings,
> I am trying to run the example in the example directory for the
> GBTClassifier. But when I view this code in eclipse, I get an error such
> that
> "The method setLabelCol(String) is undefined for the type GBTClassifier"
> For the line
>
> GBTClassifier gbt = new
> GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol(
> "indexedFeatures")
> .setMaxIter(10);
>
> However the API says this method is there, eclipse does not.
> I did a straight copy paste, including all imports.
>
> Someone please help.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Replicating a row n times

2017-09-29 Thread Weichen Xu
I suggest you to use `monotonicallyIncreasingId` which is high efficient.
But note that the ID it generated will not be consecutive.

On Fri, Sep 29, 2017 at 3:21 PM, Kanagha Kumar 
wrote:

> Thanks for the response.
> I can use either row_number() or monotonicallyIncreasingId to generate
> uniqueIds as in https://hadoopist.wordpress.com/2016/05/24/
> generate-unique-ids-for-each-rows-in-a-spark-dataframe/
>
> I'm looking for a java example to use that to replicate a single row n
> times by appending a rownum column generated as above or using explode
> function.
>
> Ex:
>
> ds.withColumn("ROWNUM", org.apache.spark.sql.functions.explode(columnEx));
>
> columnEx needs to be of type array inorder for explode to work.
>
> Any suggestions are helpful.
> Thanks
>
>
> On Thu, Sep 28, 2017 at 7:21 PM, ayan guha  wrote:
>
>> How about using row number for primary key?
>>
>> Select row_number() over (), * from table
>>
>> On Fri, 29 Sep 2017 at 10:21 am, Kanagha Kumar 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to replicate a single row from a dataset n times and create a
>>> new dataset from it. But, while replicating I need a column's value to be
>>> changed for each replication since it would be end up as the primary key
>>> when stored finally.
>>>
>>> Looked at the following reference:https://stackoverflo
>>> w.com/questions/40397740/replicate-spark-row-n-times
>>>
>>> import org.apache.spark.sql.functions._
>>> val result = singleRowDF
>>>   .withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
>>>   .selectExpr(singleRowDF.columns: _*)
>>>
>>> How can I create a column from an array of values in Java and pass it to
>>> explode function? Suggestions are helpful.
>>>
>>>
>>> Thanks
>>> Kanagha
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Applying a Java script to many files: Java API or also Python API?

2017-09-29 Thread Weichen Xu
Although python can launch subprocess to run java code, but in PySpark, the
processing code which need to run parallelly in cluster, have to be written
in python, for example, in PySpark:

def f(x):
...
rdd.map(f)  // The function `f` must be pure python code

If you try to launch subprocess to run java code in function `f`, it will
bring large overhead and many other issues.

On Thu, Sep 28, 2017 at 5:36 PM, Giuseppe Celano <
cel...@informatik.uni-leipzig.de> wrote:

> Hi,
>
> What I meant is that I could run the Java script using the subprocess
> module in Python. In that case is any difference (from directly coding in
> the Java API)  in performance expected? Thanks.
>
>
>
> On Sep 28, 2017, at 3:32 AM, Weichen Xu <weichen...@databricks.com> wrote:
>
> I think you have to use Spark Java API, in PySpark, functions running on
> spark executors (such as map function) can only written in python.
>
> On Thu, Sep 28, 2017 at 12:48 AM, Giuseppe Celano <cel...@informatik.uni-
> leipzig.de> wrote:
>
>> Hi everyone,
>>
>> I would like to apply a java script to many files in parallel. I am
>> wondering whether I should definitely use the Spark Java API, or I could
>> also run the script using the Python API (with which I am more familiar
>> with), without this affecting performance. Thanks.
>>
>> Giuseppe
>>
>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


Re: pyspark histogram

2017-09-27 Thread Weichen Xu
If you want to avoid pulling values into python you can use hive function
"histogram_numeric", you need set `SparkSession.enableHiveSupport()`, but
note that, calling hive function in spark will also slow down performance.
Spark-sql haven't implemented "histogram_numeric" yet. But I think it will
be added in future.

On Wed, Sep 27, 2017 at 11:50 PM, Brian Wylie 
wrote:

> Hi All,
>
> My google/SO searching is somehow failing on this I simply want to compute
> histograms for a column in a Spark dataframe.
>
> There are two SO hits on this question:
> - https://stackoverflow.com/questions/39154325/pyspark-
> show-histogram-of-a-data-frame-column
> - https://stackoverflow.com/questions/36043256/making-
> histogram-with-spark-dataframe-column
>
> I've actually submitted an answer to the second one but I realize that my
> answer is wrong because even though this code looks fine/simple... in my
> testing the flatmap is 'bad' because it really slows down things down
> because it's pulling each value into python.
>
> # Show histogram of the 'C1' column
> bins, counts = df.select('C1').rdd.flatMap(lambda x: x).histogram(20)
> # This is a bit awkward but I believe this is the correct way to do it
> plt.hist(bins[:-1], bins=bins, weights=counts)
>
> I've looked at QuantileDiscretizer..
>
> from pyspark.ml.feature import QuantileDiscretizer
> discretizer = QuantileDiscretizer(numBuckets=20, inputCol='query_length',
> outputCol='query_hist')
> result = discretizer.fit(spark_df).transform(spark_df)
>
> but I feel like this might be the wrong path so... the general
> question is what's the best way to compute histograms in pyspark on columns
> that have a large number of rows?
>
>
>
>
>


Re: Applying a Java script to many files: Java API or also Python API?

2017-09-27 Thread Weichen Xu
I think you have to use Spark Java API, in PySpark, functions running on
spark executors (such as map function) can only written in python.

On Thu, Sep 28, 2017 at 12:48 AM, Giuseppe Celano <
cel...@informatik.uni-leipzig.de> wrote:

> Hi everyone,
>
> I would like to apply a java script to many files in parallel. I am
> wondering whether I should definitely use the Spark Java API, or I could
> also run the script using the Python API (with which I am more familiar
> with), without this affecting performance. Thanks.
>
> Giuseppe
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to pass sparkSession from driver to executor

2017-09-21 Thread Weichen Xu
Spark do not allow executor code using `sparkSession`.
But I think you can move all json files to one directory, and them run:

```
spark.read.json("/path/to/jsonFileDir")
```
But if you want to get filename at the same time, you can use
```
spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
```

On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari 
wrote:

> Depends on your use-case however broadcasting
> 
> could be a better option.
>
> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
> chaku.mi...@gmail.com> wrote:
>
>> Hi,
>>
>> I want to know how to pass sparkSession from driver to executor.
>>
>> I have a spark program (batch job) which does following,
>>
>> #
>>
>> val spark = SparkSession.builder().appName("SampleJob").config("spark.
>> master", "local") .getOrCreate()
>>
>> val df = this is dataframe which has list of file names (hdfs)
>>
>> df.foreach { fileName =>
>>
>>   *spark.read.json(fileName)*
>>
>>   .. some logic here
>> }
>>
>> #
>>
>>
>> *spark.read.json(fileName) --- this fails as it runs in executor. When I
>> put it outside foreach, i.e. in driver, it works.*
>>
>> As I am trying to use spark (sparkSession) in executor which is not
>> visible outside driver. But I want to read hdfs files inside foreach, how
>> do I do it.
>>
>> Can someone help how to do this.
>>
>> Thanks,
>> Chackra
>>
>
>


Re: Pyspark define UDF for windows

2017-09-20 Thread Weichen Xu
UDF cannot be used as window function. You can use built-in window function
or UDAF.

On Wed, Sep 20, 2017 at 7:23 PM, Simon Dirmeier 
wrote:

> Dear all,
> I am trying to partition a DataFrame into windows and then for every
> column and window use a custom function (udf) using Spark's Python
> interface.
> Within that function I cast a column of a window in a m x n matrix to do a
> median-polish and afterwards return a list again.
>
> This doesn't work:
>
> w = Window().partitionBy(["col"]).rowsBetween(-sys.maxsize, sys.maxsize)
> def median_polish(rows, cols, values):
> // shape values as matrix defined by rows/cols
> // compute median polish
> // cast matrix back to vector
> return values
> med_pol_udf = func.udf(median_polish, DoubleType())
> for x in df.columns:
>if x.startswith("some string"):
>   df = df.withColumn(x, med_pol_udf("rows", "cols", x).over(w))
>
>
>
> The issue seems to be the windowing. Can you actually do that in Pyspark?
> Or would I need to change to Scala?
> Thanks for your help.
>
> Best,
> Simon
>


Re: Is there a SparkILoop for Java?

2017-09-20 Thread Weichen Xu
I haven't hear that. It seems that java do not have an official REPL.

On Wed, Sep 20, 2017 at 8:38 PM, kant kodali  wrote:

> Hi All,
>
> I am wondering if there is a SparkILoop
> 
>  for
> java so I can pass Java code as a string to repl?
>
> Thanks!
>


Re: for loops in pyspark

2017-09-20 Thread Weichen Xu
Spark manage memory allocation and release automatically. Can you post the
complete program which help checking where is wrong ?

On Wed, Sep 20, 2017 at 8:12 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> Hello all,
>
> I'm running a pyspark script that makes use of for loop to create smaller
> chunks of my main dataset.
>
> some example code:
>
> for chunk in chunks:
> my_rdd = sc.parallelize(chunk).flatmap(somefunc)
> # do some stuff with my_rdd
>
> my_df = make_df(my_rdd)
> # do some stuff with my_df
> my_df.write.parquet('./some/path')
>
> After a couple of loops I always start to loose executors because out of
> memory errors. Is there a way free up memory after an loop? Do I have to do
> it in python or with spark?
>
> Thanks
>