NoSuchElementException

2016-11-04 Thread Lev Tsentsiper
My code throws an exception when I am trying to create new DataSet from within 
SteamWriter sink

Simplified version of the code

  val df = sparkSession.readStream
.format("json")
.option("nullValue", " ")
.option("headerFlag", "true")
.option("spark.sql.shuffle.partitions", 1)
.option("mode", "FAILFAST")
.schema(tableSchema)
.load(s"s3n://")
df.writeStream
//TODO Switch to S3 location
//.option("checkpointLocation", s"$input/$tenant/checkpoints/")
.option("checkpointLocation", "/tmp/checkpoins/test1")
.foreach(new ForwachWriter() {
   
 override def close() = {
val sparkSession = SparkSession.builder()
  .config(new SparkConf()
.setAppName("zzz").set("spark.app.id", ""xxx)
.set("spark.master", "local[1]")
  ).getOrCreate()

val data = sparkSession.createDataset(rowList).
.createOrReplaceTempView(tempTableName)
 val sql =   sparkSession.sql("")
sql.repartition(1).foreachPartition(iter=> {})
 }

});

This code throws an exception

java.util.NoSuchElementException: key not found: 202
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at 
org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196)
at 
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421)
at 
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
at 
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
at 
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
at 
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65)
at 
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:101)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(ExistingRDD.scala:217)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at 
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
at 
org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
at 
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuer

Upgrading to Spark 2.0.1 broke array in parquet DataFrame

2016-11-04 Thread Sam Goodwin
I have a table with a few columns, some of which are arrays. Since
upgrading from Spark 1.6 to Spark 2.0.1, the array fields are always null
when reading in a DataFrame.

When writing the Parquet files, the schema of the column is specified as

StructField("packageIds",ArrayType(StringType))

The schema of the column in the Hive Metastore is

packageIds array

The schema used in the writer exactly matches the schema in the Metastore
in all ways (order, casing, types etc)

The query is a simple "select *"

spark.sql("select * from tablename limit 1").collect() // null columns in Row

How can I begin debugging this issue? Notable things I've already
investigated:

   - Files were written using Spark 1.6
   - DataFrame works in spark 1.5 and 1.6
   - I've inspected the parquet files using parquet-tools and can see the
   data.
   - I also have another table written in exactly the same way and it
   doesn't have the issue.


Re: sanboxing spark executors

2016-11-04 Thread Michael Gummelt
Mesos will let you run in docker containers, so you get filesystem
isolation, and we're about to merge CNI support:
https://github.com/apache/spark/pull/15740, which would allow you to set up
network policies.  Though you might be able to achieve whatever network
isolation you need without CNI, depending on your requirements.

As far as unauthenticated HDFS clusters, I would recommend against running
untrusted code on the same network as your secure HDFS cluster.

On Fri, Nov 4, 2016 at 4:13 PM, blazespinnaker 
wrote:

> In particular, we need to make sure the RDDs execute the lambda functions
> securely as they are provided by user code.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/sanboxing-spark-executors-tp28014p28024.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: sanboxing spark executors

2016-11-04 Thread blazespinnaker
In particular, we need to make sure the RDDs execute the lambda functions
securely as they are provided by user code.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sanboxing-spark-executors-tp28014p28024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



java.util.NoSuchElementException when trying to use dataset from worker

2016-11-04 Thread levt
My code throws an exception when I am trying to create new DataSet from
within SteamWriter sink 

Simplified version of the code 

  val df = sparkSession.readStream 
    .format("json") 
    .option("nullValue", " ") 
    .option("headerFlag", "true") 
    .option("spark.sql.shuffle.partitions", 1) 
    .option("mode", "FAILFAST") 
    .schema(tableSchema) 
    .load(s"s3n://") 
df.writeStream 
    //TODO Switch to S3 location 
    //.option("checkpointLocation", s"$input/$tenant/checkpoints/") 
    .option("checkpointLocation", "/tmp/checkpoins/test1") 
    .foreach(new ForwachWriter() { 
                
     override def close() = { 
        val sparkSession = SparkSession.builder() 
          .config(new SparkConf() 
            .setAppName("zzz").set("spark.app.id", ""xxx) 
            .set("spark.master", "local[1]") 
          ).getOrCreate() 

            val data = sparkSession.createDataset(rowList). 
            .createOrReplaceTempView(tempTableName) 
             val sql =   sparkSession.sql("") 
            sql.repartition(1).foreachPartition(iter=> {}) 
     } 

}); 
  
This code throws an exception 

java.util.NoSuchElementException: key not found: 202 
        at scala.collection.MapLike$class.default(MapLike.scala:228) 
        at scala.collection.AbstractMap.default(Map.scala:59) 
        at scala.collection.mutable.HashMap.apply(HashMap.scala:65) 
        at
org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196)
 
        at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421) 
        at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
 
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) 
        at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
 
        at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
 
        at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65) 
        at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89) 
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:101)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(ExistingRDD.scala:217)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
 
        at
org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
   

Re: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$ . Please Help!!!!!!!

2016-11-04 Thread shyla deshpande
I feel so good that Holden replied.

Yes, that was the problem. I was running from Intellij, I removed the
provided scope and works great.

Thanks a lot.

On Fri, Nov 4, 2016 at 2:05 PM, Holden Karau  wrote:

> It seems like you've marked the spark jars as provided, in this case they
> would only be provided you run your application with spark-submit or
> otherwise have Spark's JARs on your class path. How are you launching your
> application?
>
> On Fri, Nov 4, 2016 at 2:00 PM, shyla deshpande 
> wrote:
>
>> object App {
>>
>>
>>  import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.SparkSession
>>
>>   def main(args : Array[String]) {
>> println( "Hello World!" )
>>   val sparkSession = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>   }
>>
>> }
>>
>>
>> 
>>   1.8
>>   1.8
>>   UTF-8
>>   2.11.8
>>   2.11
>> 
>>
>> 
>>   
>> org.scala-lang
>> scala-library
>> ${scala.version}
>>   
>>
>>   
>>   org.apache.spark
>>   spark-core_2.11
>>   2.0.1
>>   provided
>>   
>>   
>>   org.apache.spark
>>   spark-sql_2.11
>>   2.0.1
>>   provided
>>   
>>
>>   
>> org.specs2
>> specs2-core_${scala.compat.version}
>> 2.4.16
>> test
>>   
>> 
>>
>> 
>>   src/main/scala
>> 
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: sanboxing spark executors

2016-11-04 Thread Calvin Jia
Hi,

If you are using the latest Alluxio release (1.3.0), authorization is
enabled, preventing users from accessing data they do not have permissions
to. For older versions, you will need to enable the security flag. The
documentation
on security  has more
details.

Hope this helps,
Calvin

On Fri, Nov 4, 2016 at 6:31 AM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> I think running it on a Mesos cluster could give you better control over
> this kinda stuff.
>
>
> On Fri, Nov 4, 2016 at 7:41 AM, blazespinnaker 
> wrote:
>
>> Is there a good method / discussion / documentation on how to sandbox a
>> spark
>> executor?   Assume the code is untrusted and you don't want it to be able
>> to
>> make un validated network connections or do unvalidated alluxio/hdfs/file
>> io.
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/sanboxing-spark-executors-tp28014.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Otter Networks UG
> http://otternetworks.de
> Gotenstraße 17
> 10829 Berlin
>


Re: java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$ . Please Help!!!!!!!

2016-11-04 Thread Holden Karau
It seems like you've marked the spark jars as provided, in this case they
would only be provided you run your application with spark-submit or
otherwise have Spark's JARs on your class path. How are you launching your
application?

On Fri, Nov 4, 2016 at 2:00 PM, shyla deshpande 
wrote:

> object App {
>
>
>  import org.apache.spark.sql.functions._
> import org.apache.spark.sql.SparkSession
>
>   def main(args : Array[String]) {
> println( "Hello World!" )
>   val sparkSession = SparkSession.builder.
>   master("local")
>   .appName("spark session example")
>   .getOrCreate()
>   }
>
> }
>
>
> 
>   1.8
>   1.8
>   UTF-8
>   2.11.8
>   2.11
> 
>
> 
>   
> org.scala-lang
> scala-library
> ${scala.version}
>   
>
>   
>   org.apache.spark
>   spark-core_2.11
>   2.0.1
>   provided
>   
>   
>   org.apache.spark
>   spark-sql_2.11
>   2.0.1
>   provided
>   
>
>   
> org.specs2
> specs2-core_${scala.compat.version}
> 2.4.16
> test
>   
> 
>
> 
>   src/main/scala
> 
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


java.lang.ClassNotFoundException: org.apache.spark.sql.SparkSession$ . Please Help!!!!!!!

2016-11-04 Thread shyla deshpande
object App {


 import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

  def main(args : Array[String]) {
println( "Hello World!" )
  val sparkSession = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()
  }

}



  1.8
  1.8
  UTF-8
  2.11.8
  2.11



  
org.scala-lang
scala-library
${scala.version}
  

  
  org.apache.spark
  spark-core_2.11
  2.0.1
  provided
  
  
  org.apache.spark
  spark-sql_2.11
  2.0.1
  provided
  

  
org.specs2
specs2-core_${scala.compat.version}
2.4.16
test
  



  src/main/scala



NoSuchElementException when trying to use dataset

2016-11-04 Thread levt
My code throw an exception when I am trying to create new DataSet from within
SteamWriter sink 

Simplified version of the code 

  val df = sparkSession.readStream 
    .format("json") 
    .option("nullValue", " ") 
    .option("headerFlag", "true") 
    .option("spark.sql.shuffle.partitions", 1) 
    .option("mode", "FAILFAST") 
    .schema(tableSchema) 
    .load(s"s3n://") 
df.writeStream 
    //TODO Switch to S3 location 
    //.option("checkpointLocation", s"$input/$tenant/checkpoints/") 
    .option("checkpointLocation", "/tmp/checkpoins/test1") 
    .foreach(new ForwachWriter() { 
                
     override def close() = { 
        val sparkSession = SparkSession.builder() 
          .config(new SparkConf() 
            .setAppName("zzz").set("spark.app.id", ""xxx) 
            .set("spark.master", "local[1]") 
          ).getOrCreate() 

            val data = sparkSession.createDataset(rowList). 
            .createOrReplaceTempView(tempTableName) 
             val sql =   sparkSession.sql("") 
            sql.repartition(1).foreachPartition(iter=> {}) 
     } 

}); 
  
This code throws an exception 

java.util.NoSuchElementException: key not found: 202 
        at scala.collection.MapLike$class.default(MapLike.scala:228) 
        at scala.collection.AbstractMap.default(Map.scala:59) 
        at scala.collection.mutable.HashMap.apply(HashMap.scala:65) 
        at
org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196)
 
        at
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:421) 
        at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
 
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253) 
        at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:174)
 
        at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:65)
 
        at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:65) 
        at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:89) 
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:101)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.consume(ExistingRDD.scala:150)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(ExistingRDD.scala:217)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.RowDataSourceScanExec.produce(ExistingRDD.scala:150)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
        at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
        at
org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
 
        at
org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:40)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
 
        at
org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
 
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
 
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    

Spark Float to VectorUDT for ML evaluator lib

2016-11-04 Thread Manish Tripathi
Hi

I am trying to run the ML Binary Evaluation Classifier metrics to compare
the rating with predicted values and get the AreaROC.

My dataframe has two columns with rating as int (I have binarized it) and
predicitions which is a float.

When I pass it to the ML evaluator method I get an error as shown below:

Can someone help me with gettng this sorted out?. Appreciate all the help

Stackoverflow post:

http://stackoverflow.com/questions/40408898/converting-the-float-column-in-spark-dataframe-to-vectorudt


I was trying to use the pyspark.ml.evaluation Binary classification metric
like below

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")

print evaluator.evaluate(predictions)

My Predictions data frame looks like this:

predictions.select('rating','prediction')

predictions.show()

+--++

|rating|  prediction|

+--++

| 1|  0.14829934|

| 1|-0.017862909|

| 1|   0.4951505|

| 1|0.0074382657|

| 1|-0.002562912|

| 1|   0.0208337|

| 1| 0.049362548|

| 1|  0.0969|

| 1|  0.17998546|

| 1| 0.019649783|

| 1| 0.031353004|

| 1|  0.03657037|

| 1|  0.23280995|

| 1| 0.033190556|

| 1|  0.35569906|

| 1| 0.030974165|

| 1|   0.1422375|

| 1|  0.19786166|

| 1|  0.07740938|

| 1|  0.33970386|

+--++

only showing top 20 rows

The datatype of each column is as follows:

predictions.printSchema()

root

 |-- rating: integer (nullable = true)

 |-- prediction: float (nullable = true)

Now I get an error with above Ml code saying prediction column is Float and
expected a VectorUDT.

/Users/i854319/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in
__call__(self, *args)

811 answer = self.gateway_client.send_command(command)

812 return_value = get_return_value(

--> 813 answer, self.gateway_client, self.target_id, self.name)

814

815 for temp_arg in temp_args:


/Users/i854319/spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)

 51 raise AnalysisException(s.split(': ', 1)[1],
stackTrace)

 52 if s.startswith('java.lang.IllegalArgumentException: '):

---> 53 raise IllegalArgumentException(s.split(': ', 1)[1],
stackTrace)

 54 raise

 55 return deco


IllegalArgumentException: u'requirement failed: Column prediction must be
of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually
FloatType.'

So I thought of converting the predictions column from float to VectorUDT
as below:

*Applying the schema to the dataframe to convert the float column type to
VectorUDT*

from pyspark.sql.types import IntegerType, StructType,StructField


schema = StructType([

StructField("rating", IntegerType, True),

StructField("prediction", VectorUDT(), True)

])



predictions_dtype=sqlContext.createDataFrame(prediction,schema)

But Now I get this error.

---

AssertionErrorTraceback (most recent call last)

 in ()

  4

  5 schema = StructType([

> 6 StructField("rating", IntegerType, True),

  7 StructField("prediction", VectorUDT(), True)

  8 ])


/Users/i854319/spark/python/pyspark/sql/types.pyc in __init__(self, name,
dataType, nullable, metadata)

401 False

402 """

--> 403 assert isinstance(dataType, DataType), "dataType should be
DataType"

404 if not isinstance(name, str):

405 name = name.encode('utf-8')


AssertionError: dataType should be DataType

It takes so much time to run an ml algo in spark libraries with so many
weird errors. Even I tried Mllib with RDD data. That is giving the
ValueError: Null pointer exception.



ᐧ


Re: GenericRowWithSchema cannot be cast to java.lang.Double : UDAF error

2016-11-04 Thread Manjunath, Kiran
Just to add more clarity on where the issue occurs –

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 
2, localhost): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)
at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:93)
at 
scala.collection.IndexedSeqOptimized$class.copyToArray(IndexedSeqOptimized.scala:180)
at scala.collection.mutable.WrappedArray.copyToArray(WrappedArray.scala:35)
at scala.collection.TraversableOnce$class.copyToArray(TraversableOnce.scala:278)
at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:104)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:286)
at scala.collection.mutable.WrappedArray.toArray(WrappedArray.scala:73)
at GeometricMean.evaluate(:51)


Regards,
Kiran

From: "Manjunath, Kiran" 
Date: Saturday, November 5, 2016 at 2:16 AM
To: "user@spark.apache.org" 
Subject: GenericRowWithSchema cannot be cast to java.lang.Double : UDAF error

I am trying to implement a sample “sum” functionality over rolling window.
Below code may not make sense (may not be efficient) but during the course of 
other major implementation, have stumbled on below error which is blocking.

Error Obtained -  “GenericRowWithSchema cannot be cast to java.lang.Double” 
during evaluation.
Is this a known problem in Spark?
I am using 2.0.0

Code


class GeometricMean extends UserDefinedAggregateFunction {

def inputSchema: StructType = StructType(StructField("value", DoubleType) 
:: Nil)
def bufferSchema: StructType = new StructType().add("info", 
ArrayType(DoubleType),false)


def dataType : DataType =  DoubleType
def deterministic: Boolean = true

def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ArrayBuffer.empty[Double]

}

def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
val arr1 = buffer.getAs[Seq[Double]](0)
val arr = ArrayBuffer(input) ++ arr1
buffer(0) = arr
}

def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[Seq[Double]](0)
val arr = arr1 ++  buffer2.getAs[Seq[Double]](0)
buffer1.update(0,arr)
}

def evaluate(buffer: Row): Any = {
var s : Double = 0
val arr = buffer.getAs[Seq[Double]](0)
val arrd = arr.toArray
arrd.foreach(s += _)
s
}
}

val GM  = new GeometricMean
val r = new scala.util.Random(88)
val schema = new StructType().add("id",IntegerType).add("Count",IntegerType)

val rnNum1 = for( i <- 1 to 10) yield { Row(i,r.nextInt(10-0+1)) }

val wSpec1 = Window.orderBy("id").rowsBetween(-1, +3)
val rdd = sc.parallelize(rnNum)
val df = sqlContext.createDataFrame(rdd,schema)
val dfWithMovingAvg = df.withColumn( 
"movingAvg",avg(df.col("Count")).over(wSpec1)).withColumn("customMovingAvg",GM(df.col("Count")).over(wSpec1))
dfWithMovingAvg.take(5).foreach(println)


Error
===


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 
2, localhost): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)



Regards,
Kiran


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
So basically what I am saying is

- increase poll.ms
- use a separate group id everywhere
- stop committing offsets under the covers

That should eliminate all of those as possible causes, and then we can
see if there are still issues.

As far as 0.8 vs 0.10, Spark doesn't require you to assign or
subscribe to a topic in order to update offsets, Kafka does.  If you
don't like the new Kafka consumer api, the existing 0.8 simple
consumer api should be usable with later brokers.  As long as you
don't need SSL or dynamic subscriptions, and it meets your needs, keep
using it.

On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy  wrote:
> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
> single distinct topic. For example, the group would be something like
> "storage-group", and the topics would be "storage-channel1", and
> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
> partitions assigned, and then commit offsets are called after the RDD is
> processed. This should not interfere with the consumer group used by the
> executors which would be "spark-executor-storage-group".
>
> In the streaming example there is a single topic ("client-events") and group
> ("processing-group"). A single stream is created and offsets are manually
> updated from the executor after each partition is handled. This was a
> challenge since Spark now requires one to assign or subscribe to a topic in
> order to even update the offsets. In 0.8.2.x you did not have to worry about
> that. This approach limits your exposure to duplicate data since idempotent
> records are not entirely possible in our scenario. At least without a lot of
> re-running of logic to de-dup.
>
> Thanks,
>
> Ivan
>
> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger  wrote:
>>
>> So just to be clear, the answers to my questions are
>>
>> - you are not using different group ids, you're using the same group
>> id everywhere
>>
>> - you are committing offsets manually
>>
>> Right?
>>
>> If you want to eliminate network or kafka misbehavior as a source,
>> tune poll.ms upwards even higher.
>>
>> You must use different group ids for different rdds or streams.
>> Kafka consumers won't behave the way you expect if they are all in the
>> same group id, and the consumer cache is keyed by group id. Yes, the
>> executor will tack "spark-executor-" on to the beginning, but if you
>> give it the same base group id, it will be the same.  And the driver
>> will use the group id you gave it, unmodified.
>>
>> Finally, I really can't help you if you're manually writing your own
>> code to commit offsets directly to Kafka.  Trying to minimize
>> duplicates that way doesn't really make sense, your system must be
>> able to handle duplicates if you're using kafka as an offsets store,
>> it can't do transactional exactly once.
>>
>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy  wrote:
>> > Here are some examples and details of the scenarios. The KafkaRDD is the
>> > most
>> > error prone to polling
>> > timeouts and concurrentm modification errors.
>> >
>> > *Using KafkaRDD* - This takes a list of channels and processes them in
>> > parallel using the KafkaRDD directly. they all use the same consumer
>> > group
>> > ('storage-group'), but each has it's own topic and each topic has 4
>> > partitions. We routinely get timeout errors when polling for data. This
>> > occurs whether we process in parallel or sequentially.
>> >
>> > *Spark Kafka setting:*
>> > spark.streaming.kafka.consumer.poll.ms=2000
>> >
>> > *Kafka Consumer Params:*
>> > metric.reporters = []
>> > metadata.max.age.ms = 30
>> > partition.assignment.strategy =
>> > [org.apache.kafka.clients.consumer.RangeAssignor]
>> > reconnect.backoff.ms = 50
>> > sasl.kerberos.ticket.renew.window.factor = 0.8
>> > max.partition.fetch.bytes = 1048576
>> > bootstrap.servers = [somemachine:31000]
>> > ssl.keystore.type = JKS
>> > enable.auto.commit = false
>> > sasl.mechanism = GSSAPI
>> > interceptor.classes = null
>> > exclude.internal.topics = true
>> > ssl.truststore.password = null
>> > client.id =
>> > ssl.endpoint.identification.algorithm = null
>> > max.poll.records = 1000
>> > check.crcs = true
>> > request.timeout.ms = 4
>> > heartbeat.interval.ms = 3000
>> > auto.commit.interval.ms = 5000
>> > receive.buffer.bytes = 65536
>> > ssl.truststore.type = JKS
>> > ssl.truststore.location = null
>> > ssl.keystore.password = null
>> > fetch.min.bytes = 1
>> > send.buffer.bytes = 131072
>> > value.deserializer = class
>> > com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
>> > group.id = storage-group
>> > retry.backoff.ms = 100
>> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> > sasl.kerberos.service.name = null
>> > sasl.kerberos.ticket.renew.jitter = 0.05
>> > ssl.trustmanager.algorithm = PKIX
>> > ssl.key.password = null
>> > fetch.max.wait.ms = 500
>> > sasl.kerberos.min.time.before.relogin = 6
>> > connections.max.idle.ms = 54
>> > session.timeout.ms = 300

GenericRowWithSchema cannot be cast to java.lang.Double : UDAF error

2016-11-04 Thread Manjunath, Kiran
I am trying to implement a sample “sum” functionality over rolling window.
Below code may not make sense (may not be efficient) but during the course of 
other major implementation, have stumbled on below error which is blocking.

Error Obtained -  “GenericRowWithSchema cannot be cast to java.lang.Double” 
during evaluation.
Is this a known problem in Spark?
I am using 2.0.0

Code


class GeometricMean extends UserDefinedAggregateFunction {

def inputSchema: StructType = StructType(StructField("value", DoubleType) 
:: Nil)
def bufferSchema: StructType = new StructType().add("info", 
ArrayType(DoubleType),false)


def dataType : DataType =  DoubleType
def deterministic: Boolean = true

def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ArrayBuffer.empty[Double]

}

def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
val arr1 = buffer.getAs[Seq[Double]](0)
val arr = ArrayBuffer(input) ++ arr1
buffer(0) = arr
}

def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[Seq[Double]](0)
val arr = arr1 ++  buffer2.getAs[Seq[Double]](0)
buffer1.update(0,arr)
}

def evaluate(buffer: Row): Any = {
var s : Double = 0
val arr = buffer.getAs[Seq[Double]](0)
val arrd = arr.toArray
arrd.foreach(s += _)
s
}
}

val GM  = new GeometricMean
val r = new scala.util.Random(88)
val schema = new StructType().add("id",IntegerType).add("Count",IntegerType)

val rnNum1 = for( i <- 1 to 10) yield { Row(i,r.nextInt(10-0+1)) }

val wSpec1 = Window.orderBy("id").rowsBetween(-1, +3)
val rdd = sc.parallelize(rnNum)
val df = sqlContext.createDataFrame(rdd,schema)
val dfWithMovingAvg = df.withColumn( 
"movingAvg",avg(df.col("Count")).over(wSpec1)).withColumn("customMovingAvg",GM(df.col("Count")).over(wSpec1))
dfWithMovingAvg.take(5).foreach(println)


Error
===


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 
2, localhost): java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:114)



Regards,
Kiran


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Ivan von Nagy
Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses
a single distinct topic. For example, the group would be something like
"storage-group", and the topics would be "storage-channel1", and
"storage-channel2". In each thread a KafkaConsumer is started, assigned the
partitions assigned, and then commit offsets are called after the RDD is
processed. This should not interfere with the consumer group used by the
executors which would be "spark-executor-storage-group".

In the streaming example there is a single topic ("client-events") and
group ("processing-group"). A single stream is created and offsets are
manually updated from the executor after each partition is handled. This
was a challenge since Spark now requires one to assign or subscribe to a
topic in order to even update the offsets. In 0.8.2.x you did not have to
worry about that. This approach limits your exposure to duplicate data
since idempotent records are not entirely possible in our scenario. At
least without a lot of re-running of logic to de-dup.

Thanks,

Ivan

On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger  wrote:

> So just to be clear, the answers to my questions are
>
> - you are not using different group ids, you're using the same group
> id everywhere
>
> - you are committing offsets manually
>
> Right?
>
> If you want to eliminate network or kafka misbehavior as a source,
> tune poll.ms upwards even higher.
>
> You must use different group ids for different rdds or streams.
> Kafka consumers won't behave the way you expect if they are all in the
> same group id, and the consumer cache is keyed by group id. Yes, the
> executor will tack "spark-executor-" on to the beginning, but if you
> give it the same base group id, it will be the same.  And the driver
> will use the group id you gave it, unmodified.
>
> Finally, I really can't help you if you're manually writing your own
> code to commit offsets directly to Kafka.  Trying to minimize
> duplicates that way doesn't really make sense, your system must be
> able to handle duplicates if you're using kafka as an offsets store,
> it can't do transactional exactly once.
>
> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy  wrote:
> > Here are some examples and details of the scenarios. The KafkaRDD is the
> most
> > error prone to polling
> > timeouts and concurrentm modification errors.
> >
> > *Using KafkaRDD* - This takes a list of channels and processes them in
> > parallel using the KafkaRDD directly. they all use the same consumer
> group
> > ('storage-group'), but each has it's own topic and each topic has 4
> > partitions. We routinely get timeout errors when polling for data. This
> > occurs whether we process in parallel or sequentially.
> >
> > *Spark Kafka setting:*
> > spark.streaming.kafka.consumer.poll.ms=2000
> >
> > *Kafka Consumer Params:*
> > metric.reporters = []
> > metadata.max.age.ms = 30
> > partition.assignment.strategy =
> > [org.apache.kafka.clients.consumer.RangeAssignor]
> > reconnect.backoff.ms = 50
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > max.partition.fetch.bytes = 1048576
> > bootstrap.servers = [somemachine:31000]
> > ssl.keystore.type = JKS
> > enable.auto.commit = false
> > sasl.mechanism = GSSAPI
> > interceptor.classes = null
> > exclude.internal.topics = true
> > ssl.truststore.password = null
> > client.id =
> > ssl.endpoint.identification.algorithm = null
> > max.poll.records = 1000
> > check.crcs = true
> > request.timeout.ms = 4
> > heartbeat.interval.ms = 3000
> > auto.commit.interval.ms = 5000
> > receive.buffer.bytes = 65536
> > ssl.truststore.type = JKS
> > ssl.truststore.location = null
> > ssl.keystore.password = null
> > fetch.min.bytes = 1
> > send.buffer.bytes = 131072
> > value.deserializer = class
> > com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
> > group.id = storage-group
> > retry.backoff.ms = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.service.name = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > ssl.trustmanager.algorithm = PKIX
> > ssl.key.password = null
> > fetch.max.wait.ms = 500
> > sasl.kerberos.min.time.before.relogin = 6
> > connections.max.idle.ms = 54
> > session.timeout.ms = 3
> > metrics.num.samples = 2
> > key.deserializer = class
> > org.apache.kafka.common.serialization.StringDeserializer
> > ssl.protocol = TLS
> > ssl.provider = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.keystore.location = null
> > ssl.cipher.suites = null
> > security.protocol = PLAINTEXT
> > ssl.keymanager.algorithm = SunX509
> > metrics.sample.window.ms = 3
> > auto.offset.reset = earliest
> >
> > *Example usage with KafkaRDD:*
> > val channels = Seq("channel1", "channel2")
> >
> > channels.toParArray.foreach { channel =>
> >   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >
> >   // Get offsets for the given topic and the consumer group
> 'storage-group'
> >   val offsetRanges = getOffsets("storage-gro

Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
So just to be clear, the answers to my questions are

- you are not using different group ids, you're using the same group
id everywhere

- you are committing offsets manually

Right?

If you want to eliminate network or kafka misbehavior as a source,
tune poll.ms upwards even higher.

You must use different group ids for different rdds or streams.
Kafka consumers won't behave the way you expect if they are all in the
same group id, and the consumer cache is keyed by group id. Yes, the
executor will tack "spark-executor-" on to the beginning, but if you
give it the same base group id, it will be the same.  And the driver
will use the group id you gave it, unmodified.

Finally, I really can't help you if you're manually writing your own
code to commit offsets directly to Kafka.  Trying to minimize
duplicates that way doesn't really make sense, your system must be
able to handle duplicates if you're using kafka as an offsets store,
it can't do transactional exactly once.

On Fri, Nov 4, 2016 at 1:48 PM, vonnagy  wrote:
> Here are some examples and details of the scenarios. The KafkaRDD is the most
> error prone to polling
> timeouts and concurrentm modification errors.
>
> *Using KafkaRDD* - This takes a list of channels and processes them in
> parallel using the KafkaRDD directly. they all use the same consumer group
> ('storage-group'), but each has it's own topic and each topic has 4
> partitions. We routinely get timeout errors when polling for data. This
> occurs whether we process in parallel or sequentially.
>
> *Spark Kafka setting:*
> spark.streaming.kafka.consumer.poll.ms=2000
>
> *Kafka Consumer Params:*
> metric.reporters = []
> metadata.max.age.ms = 30
> partition.assignment.strategy =
> [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [somemachine:31000]
> ssl.keystore.type = JKS
> enable.auto.commit = false
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id =
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 1000
> check.crcs = true
> request.timeout.ms = 4
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class
> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
> group.id = storage-group
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 6
> connections.max.idle.ms = 54
> session.timeout.ms = 3
> metrics.num.samples = 2
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 3
> auto.offset.reset = earliest
>
> *Example usage with KafkaRDD:*
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group 'storage-group'
>   val offsetRanges = getOffsets("storage-group", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
> kafkaParams asJava,
> offsetRanges,
> PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
>
> *Example usage with Kafka Stream:*
> This creates a stream and processes events in each partition. At the end of
> processing for
> each partition, we updated the offsets for each partition. This is
> challenging to do, but is better
> then calling commitAysnc on the stream, because that occurs after the
> /entire/ RDD has been
> processed. This method minimizes duplicates in an exactly once environment.
> Since the executors
> use their own custom group "spark-executor-processor-group" and the commit
> is buried in private
> functions we are unable to use the executors cached consumer to update the
> offsets. This requires us
> to go through multiple steps to update the Kafka offsets accordingly.
>
> val offsetRanges = getOffsets("processor-group", "my-topic")
>
> val stream = KafkaUtils.createDirectStream[K, V](context,
>   PreferConsistent,
>   Subscribe[K, V](Seq("my-topic") asJavaCollection,
> kafk

Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Koert Kuipers
okay i see the partition local sort. got it.

i would expect that pushing the partition local sort into shuffle would
give a signficicant boost. but thats just a guess.

On Fri, Nov 4, 2016 at 2:39 PM, Michael Armbrust 
wrote:

> sure, but then my values are not sorted per key, right?
>
>
> It does do a partition local sort. Look at the query plan in my example
> .
> The code here will also take care of finding the boundaries and is pretty
> careful to spill / avoid materializing unnecessarily.
>
> I think you are correct though that we are not pushing any of the sort
> into the shuffle.  I'm not sure how much that buys you.  If its a lot we
> could extend the planner to look for Exchange->Sort pairs and change the
> exchange.
>
> On Fri, Nov 4, 2016 at 7:06 AM, Koert Kuipers  wrote:
>
>> i just noticed Sort for Dataset has a global flag. and Dataset also has
>> sortWithinPartitions.
>>
>> how about:
>> repartition + sortWithinPartitions + mapPartitions?
>>
>> the plan looks ok, but it is not clear to me if the sort is done as part
>> of the shuffle (which is the important optimization).
>>
>> scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
>> "value")
>>
>> scala> df.repartition(2, 
>> col("key")).sortWithinPartitions("value").as[(String,
>> String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
>> == Physical Plan ==
>> *SerializeFromObject [staticinvoke(class 
>> org.apache.spark.unsafe.types.UTF8String,
>> StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top
>> level non-flat input object)._1, true) AS _1#39, staticinvoke(class
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>> assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
>> object)._2, true) AS _2#40]
>> +- MapPartitions , obj#38: scala.Tuple2
>>+- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
>> scala.Tuple2
>>   +- *Sort [value#6 ASC], false, 0
>>  +- Exchange hashpartitioning(key#5, 2)
>> +- LocalTableScan [key#5, value#6]
>>
>>
>>
>>
>> On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers  wrote:
>>
>>> sure, but then my values are not sorted per key, right?
>>>
>>> so a group by key with values sorted according to to some ordering is an
>>> operation that can be done efficiently in a single shuffle without first
>>> figuring out range boundaries. and it is needed for quite a few algos,
>>> including Window and lots of timeseries stuff. but it seems there is no way
>>> to express i want to do this yet (at least not in an efficient way).
>>>
>>> which makes me wonder, what does Window do?
>>>
>>>
>>> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Thinking out loud is good :)

 You are right in that anytime you ask for a global ordering from Spark
 you will pay the cost of figuring out the range boundaries for partitions.
 If you say orderBy, though, we aren't sure that you aren't expecting a
 global order.

 If you only want to make sure that items are colocated, it is cheaper
 to do a groupByKey followed by a flatMapGroups
 
 .



 On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers 
 wrote:

> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
> then do mapPartitions?
>
> sorry thinking out loud a bit here. ok i think that could work. thanks
>
> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers 
> wrote:
>
>> thats an interesting thought about orderBy and mapPartitions. i guess
>> i could emulate a groupBy with secondary sort using those two. however
>> isn't using an orderBy expensive since it is a total sort? i mean a 
>> groupBy
>> with secondary sort is also a total sort under the hood, but its on
>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>> therefore can be implemented more efficiently.
>>
>>
>>
>>
>>
>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> It is still unclear to me why we should remember all these tricks
 (or add lots of extra little functions) when this elegantly can be
 expressed in a reduce operation with a simple one line lamba function.

>>> I think you can do that too.  KeyValueGroupedDataset has a
>>> reduceGroups function.  This probably won't be as fast though because 
>>> you
>>> end up creating objects where as the version I gave will get codgened to
>>> operate on binary data the whole way though.
>>>
 The same applies to 

Re: Clustering Webpages using KMean and Spark Apis : GC limit exceed.

2016-11-04 Thread Reth RM
Called the repartition method at line 2 in the above code, and the error is
no more reported.
JavaRDD> terms = getContent(input).repartition(10);

But I am curious if this is correct approach and for any inputs/suggestions
towards optimization of the above code?



On Fri, Nov 4, 2016 at 11:13 AM, Reth RM  wrote:

> Hi,
>
>  Can you please guide me through parallelizing the task of extracting
> webpages text, converting text to doc vectors and finally applying k-mean.
> I get a  "GC overhead limit exceeded at java.util.Arrays.copyOfRange" at
> task 3 below.  detail stack trace : https://jpst.it/P33P
>
> Right now webpage files are 100k. Current approach:  1) I am using
> wholeTextFiles apis to load the 1M webpages, 2) PairRDD to extract content
> and convert to tokens.  4) Passing this array to convert to doc-vectors and
> finally passing vectors to Kmean. 5) Running job spark-submit,
> standalone, ./spark-submit --master spark://host:7077 --executor-memory 4g
> --driver-memory 4g --class sfsu.spark.main.webmain
> /clustering-1.0-SNAPSHOT.jar
>
> Code snippet as below, I think I should parallelize task 3 or I am doing
> something really wrong, could you please point me to mistakes here?
>
>
> 1. JavaPairRDD input = sc.wholeTextFiles(webFilesPath);
>
> 2. JavaRDD> terms = getContent(input);
>
> 3. public JavaRDD> getContent(JavaPairRDD input) 
> {
> return input.map(new Function, List>() {
> public List call(Tuple2 tuple) throws 
> Exception {
> return Arrays.asList(tuple._2().replaceAll("[^A-Za-z']+", " 
> ").trim().toLowerCase().split("\\W+"));
> }
> });
> }
>
> 4. JavaRDD tfVectors = tf.transform(terms).cache();
>
> 5. KMeansModel model = train(vectors, kMeanProperties);
>
>


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread vonnagy
Here are some examples and details of the scenarios. The KafkaRDD is the most
error prone to polling 
timeouts and concurrentm modification errors.

*Using KafkaRDD* - This takes a list of channels and processes them in
parallel using the KafkaRDD directly. they all use the same consumer group
('storage-group'), but each has it's own topic and each topic has 4
partitions. We routinely get timeout errors when polling for data. This
occurs whether we process in parallel or sequentially. 

*Spark Kafka setting:*
spark.streaming.kafka.consumer.poll.ms=2000

*Kafka Consumer Params:*
metric.reporters = []
metadata.max.age.ms = 30
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [somemachine:31000]
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = 
ssl.endpoint.identification.algorithm = null
max.poll.records = 1000
check.crcs = true
request.timeout.ms = 4
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class
com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
group.id = storage-group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
session.timeout.ms = 3
metrics.num.samples = 2
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
auto.offset.reset = earliest

*Example usage with KafkaRDD:*
val channels = Seq("channel1", "channel2")

channels.toParArray.foreach { channel =>
  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)

  // Get offsets for the given topic and the consumer group 'storage-group'
  val offsetRanges = getOffsets("storage-group", channel)

  val ds = KafkaUtils.createRDD[K, V](context,
kafkaParams asJava,
offsetRanges,
PreferConsistent).toDS[V]

  // Do some aggregations
  ds.agg(...)
  // Save the data
  ds.write.mode(SaveMode.Append).parquet(somePath)
  // Save offsets using a KafkaConsumer
  consumer.commitSync(newOffsets.asJava)
  consumer.close()
}


*Example usage with Kafka Stream:*
This creates a stream and processes events in each partition. At the end of
processing for
each partition, we updated the offsets for each partition. This is
challenging to do, but is better
then calling commitAysnc on the stream, because that occurs after the
/entire/ RDD has been 
processed. This method minimizes duplicates in an exactly once environment.
Since the executors 
use their own custom group "spark-executor-processor-group" and the commit
is buried in private 
functions we are unable to use the executors cached consumer to update the
offsets. This requires us
to go through multiple steps to update the Kafka offsets accordingly.

val offsetRanges = getOffsets("processor-group", "my-topic")

val stream = KafkaUtils.createDirectStream[K, V](context,
  PreferConsistent,
  Subscribe[K, V](Seq("my-topic") asJavaCollection,
kafkaParams,
offsetRanges))

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// Transform our data
   rdd.foreachPartition { events =>
   // Establish a consumer in the executor so we can update offsets
after each partition.
   // This class is homegrown and uses the KafkaConsumer to help get/set
offsets
   val consumer = new ConsumerUtils(kafkaParams)
   // do something with our data
   
   // Write the offsets that were updated in this partition 
   kafkaConsumer.setConsumerOffsets("processor-group",
  Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
   }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Michael Armbrust
>
> sure, but then my values are not sorted per key, right?


It does do a partition local sort. Look at the query plan in my example
.
The code here will also take care of finding the boundaries and is pretty
careful to spill / avoid materializing unnecessarily.

I think you are correct though that we are not pushing any of the sort into
the shuffle.  I'm not sure how much that buys you.  If its a lot we could
extend the planner to look for Exchange->Sort pairs and change the exchange.

On Fri, Nov 4, 2016 at 7:06 AM, Koert Kuipers  wrote:

> i just noticed Sort for Dataset has a global flag. and Dataset also has
> sortWithinPartitions.
>
> how about:
> repartition + sortWithinPartitions + mapPartitions?
>
> the plan looks ok, but it is not clear to me if the sort is done as part
> of the shuffle (which is the important optimization).
>
> scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
> "value")
>
> scala> df.repartition(2, col("key")).sortWithinPartitions("value").as[(String,
> String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
> == Physical Plan ==
> *SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString, assertnotnull(input[0, scala.Tuple2, true], top
> level non-flat input object)._1, true) AS _1#39, staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
> assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
> object)._2, true) AS _2#40]
> +- MapPartitions , obj#38: scala.Tuple2
>+- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
> scala.Tuple2
>   +- *Sort [value#6 ASC], false, 0
>  +- Exchange hashpartitioning(key#5, 2)
> +- LocalTableScan [key#5, value#6]
>
>
>
>
> On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers  wrote:
>
>> sure, but then my values are not sorted per key, right?
>>
>> so a group by key with values sorted according to to some ordering is an
>> operation that can be done efficiently in a single shuffle without first
>> figuring out range boundaries. and it is needed for quite a few algos,
>> including Window and lots of timeseries stuff. but it seems there is no way
>> to express i want to do this yet (at least not in an efficient way).
>>
>> which makes me wonder, what does Window do?
>>
>>
>> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust > > wrote:
>>
>>> Thinking out loud is good :)
>>>
>>> You are right in that anytime you ask for a global ordering from Spark
>>> you will pay the cost of figuring out the range boundaries for partitions.
>>> If you say orderBy, though, we aren't sure that you aren't expecting a
>>> global order.
>>>
>>> If you only want to make sure that items are colocated, it is cheaper to
>>> do a groupByKey followed by a flatMapGroups
>>> 
>>> .
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers  wrote:
>>>
 i guess i could sort by (hashcode(key), key, secondarySortColumn) and
 then do mapPartitions?

 sorry thinking out loud a bit here. ok i think that could work. thanks

 On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers 
 wrote:

> thats an interesting thought about orderBy and mapPartitions. i guess
> i could emulate a groupBy with secondary sort using those two. however
> isn't using an orderBy expensive since it is a total sort? i mean a 
> groupBy
> with secondary sort is also a total sort under the hood, but its on
> (hashCode(key), secondarySortColumn) which is easier to distribute and
> therefore can be implemented more efficiently.
>
>
>
>
>
> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
>
>> It is still unclear to me why we should remember all these tricks (or
>>> add lots of extra little functions) when this elegantly can be 
>>> expressed in
>>> a reduce operation with a simple one line lamba function.
>>>
>> I think you can do that too.  KeyValueGroupedDataset has a
>> reduceGroups function.  This probably won't be as fast though because you
>> end up creating objects where as the version I gave will get codgened to
>> operate on binary data the whole way though.
>>
>>> The same applies to these Window functions. I had to read it 3 times
>>> to understand what it all means. Maybe it makes sense for someone who 
>>> has
>>> been forced to use such limited tools in sql for many years but that's 
>>> not
>>> necessary what we should aim for. Why can I not just have the sortBy and
>>> then an Iterator[X] => Iterator[Y] to express what I want t

Clustering Webpages using KMean and Spark Apis : GC limit exceed.

2016-11-04 Thread Reth RM
Hi,

 Can you please guide me through parallelizing the task of extracting
webpages text, converting text to doc vectors and finally applying k-mean.
I get a  "GC overhead limit exceeded at java.util.Arrays.copyOfRange" at
task 3 below.  detail stack trace : https://jpst.it/P33P

Right now webpage files are 100k. Current approach:  1) I am using
wholeTextFiles apis to load the 1M webpages, 2) PairRDD to extract content
and convert to tokens.  4) Passing this array to convert to doc-vectors and
finally passing vectors to Kmean. 5) Running job spark-submit,
standalone, ./spark-submit --master spark://host:7077 --executor-memory 4g
--driver-memory 4g --class sfsu.spark.main.webmain
/clustering-1.0-SNAPSHOT.jar

Code snippet as below, I think I should parallelize task 3 or I am doing
something really wrong, could you please point me to mistakes here?


1. JavaPairRDD input = sc.wholeTextFiles(webFilesPath);

2. JavaRDD> terms = getContent(input);

3. public JavaRDD> getContent(JavaPairRDD input) {
return input.map(new Function, List>() {
public List call(Tuple2 tuple) throws
Exception {
return Arrays.asList(tuple._2().replaceAll("[^A-Za-z']+",
" ").trim().toLowerCase().split("\\W+"));
}
});
}

4. JavaRDD tfVectors = tf.transform(terms).cache();

5. KMeansModel model = train(vectors, kMeanProperties);


SAXParseException while writing to parquet on s3

2016-11-04 Thread lminer
I'm trying to read in some json, infer a schema, and write it out again as
parquet to s3 (s3a). For some reason, about a third of the way through the
writing portion of the run, spark always errors out with the error included
below. I can't find any obvious reasons for the issue: it isn't out of
memory; there are no long GC pauses. There don't seem to be any additional
error messages in the logs of the individual executors.

The script runs fine on another set of data that I have, which is of a very
similar structure, but several orders of magnitude smaller.

I am running spark 2.0.1-hadoop-2.7 and am using the FileOutputCommitter.
The algorithm version doesn't seem to matter.

This does not appear to be a problem in badly formed json or corrupted
files. I have unzipped and read in each file individually with no error.

Here's a simplified version of the script:

object Foo {

  def parseJson(json: String): Option[Map[String, Any]] = {
if (json == null)
  Some(Map())
else
  parseOpt(json).map((j: JValue) =>
j.values.asInstanceOf[Map[String, Any]])
  }
  }
}

// read in as text and parse json using json4s
val jsonRDD: RDD[String] = sc.textFile(inputPath)
.map(row -> Foo.parseJson(row))

// infer a schema that will encapsulate the most rows in a sample of
size sampleRowNum
val schema: StructType = Infer.getMostCommonSchema(sc, jsonRDD,
sampleRowNum)

// get documents compatibility with schema
val jsonWithCompatibilityRDD: RDD[(String, Boolean)] = jsonRDD
  .map(js => (js, Infer.getSchemaCompatibility(schema,
Infer.inferSchema(js)).toBoolean))
  .repartition(partitions)

val jsonCompatibleRDD: RDD[String] = jsonWithCompatibilityRDD
  .filter { case (js: String, compatible: Boolean) => compatible }
  .map { case (js: String, _: Boolean) => js }

// create a dataframe from documents with compatible schema
val dataFrame: DataFrame =
spark.read.schema(schema).json(jsonCompatibleRDD)

It completes the earlier schema inferring steps successfully. The error
itself occurs on the last line, but I suppose that could encompass at least
the immediately preceding statemnt, if not earlier:

org.apache.spark.SparkException: Task failed while writing rows
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Failed to commit task
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:275)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:257)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1345)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
... 8 more
Suppressed: java.lang.NullPointerException
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
at
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetFileFormat.scala:569)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:282)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:258)
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAnd

Re: Error creating SparkSession, in IntelliJ

2016-11-04 Thread shyla deshpande
I have built many projects using IntellJ, maven using prior version of
Spark. If anyone has a successful project with Kafka, Spark 2.0.1,
Cassandra, Please share the pom.xml file.
Thanks
-S

On Thu, Nov 3, 2016 at 10:03 PM, Hyukjin Kwon  wrote:

> Hi Shyla,
>
> there is the documentation for setting up IDE - https://cwiki.apache.org/
> confluence/display/SPARK/Useful+Developer+Tools#
> UsefulDeveloperTools-IDESetup
>
> I hope this is helpful.
>
>
> 2016-11-04 9:10 GMT+09:00 shyla deshpande :
>
>> Hello Everyone,
>>
>> I just installed Spark 2.0.1, spark shell works fine.
>>
>> Was able to run some simple programs from the Spark Shell, but find it
>> hard to make the same program work when using IntelliJ.
>>  I am getting the following error.
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> scala.Predef$.$scope()Lscala/xml/TopScope$;
>> at org.apache.spark.ui.jobs.StagePage.(StagePage.scala:44)
>> at org.apache.spark.ui.jobs.StagesTab.(StagesTab.scala:34)
>> at org.apache.spark.ui.SparkUI.(SparkUI.scala:62)
>> at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:215)
>> at org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:157)
>> at org.apache.spark.SparkContext.(SparkContext.scala:440)
>> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2275)
>> at org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(
>> SparkSession.scala:831)
>> at org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(
>> SparkSession.scala:823)
>> at scala.Option.getOrElse(Option.scala:121)
>> at org.apache.spark.sql.SparkSession$Builder.getOrCreate(
>> SparkSession.scala:823)
>> at SparkSessionTest.DataSetWordCount$.main(DataSetWordCount.scala:15)
>> at SparkSessionTest.DataSetWordCount.main(DataSetWordCount.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>
>> Thanks
>> -Shyla
>>
>>
>>
>


Re: Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread Cody Koeninger
- are you using different group ids for the different streams?
- are you manually committing offsets?
- what are the values of your kafka-related settings?

On Fri, Nov 4, 2016 at 12:20 PM, vonnagy  wrote:
> I am getting the issues using Spark 2.0.1 and Kafka 0.10. I have two jobs,
> one that uses a Kafka stream and one that uses just the KafkaRDD.
>
> With the KafkaRDD, I continually get the "Failed to get records .. after
> polling". I have adjusted the polling with
> `spark.streaming.kafka.consumer.poll.ms` and the size of records with
> Kafka's `max.poll.records`. Even when it gets records it is extremely slow.
>
> When working with multiple KafkaRDDs in parallel I get the dreaded
> `ConcurrentModificationException`. The Spark logic is supposed to use a
> CachedKafkaConsumer based on the topic and partition. This is supposed to
> guarantee thread safety, but I continually get this error along with the
> polling timeout.
>
> Has anyone else tried to use Spark 2 with Kafka 0.10 and had any success. At
> this point it is completely useless in my experience. With Spark 1.6 and
> Kafka 0.8.x, I never had these problems.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Delegation Token renewal in yarn-cluster

2016-11-04 Thread Marcelo Vanzin
On Fri, Nov 4, 2016 at 1:57 AM, Zsolt Tóth  wrote:
> This was what confused me in the first place. Why does Spark ask for new
> tokens based on the renew-interval instead of the max-lifetime?

It could be just a harmless bug, since tokens have a "getMaxDate()"
method which I assume returns the token's lifetime, although there's
no documentation. Or it could be that the max lifetime of the token is
not really available to the code. If you want to experiment with the
code, that should be a small change (if getMaxDate() returns the right
thing).

-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Instability issues with Spark 2.0.1 and Kafka 0.10

2016-11-04 Thread vonnagy
I am getting the issues using Spark 2.0.1 and Kafka 0.10. I have two jobs,
one that uses a Kafka stream and one that uses just the KafkaRDD. 

With the KafkaRDD, I continually get the "Failed to get records .. after
polling". I have adjusted the polling with
`spark.streaming.kafka.consumer.poll.ms` and the size of records with
Kafka's `max.poll.records`. Even when it gets records it is extremely slow. 

When working with multiple KafkaRDDs in parallel I get the dreaded
`ConcurrentModificationException`. The Spark logic is supposed to use a
CachedKafkaConsumer based on the topic and partition. This is supposed to
guarantee thread safety, but I continually get this error along with the
polling timeout. 

Has anyone else tried to use Spark 2 with Kafka 0.10 and had any success. At
this point it is completely useless in my experience. With Spark 1.6 and
Kafka 0.8.x, I never had these problems.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Deep learning libraries for scala

2016-11-04 Thread Masood Krohy
If you need ConvNets and RNNs and want to stay in Scala/Java, then Deep 
Learning for Java (DL4J) might be the most mature option.

If you want  ConvNets and RNNs, as implemented in TensorFlow, along with 
all the bells and whistles, then you might want to switch to PySpark + 
TensorFlow and write the entire pipeline in Python. You'd do the data 
preparation/ingestion in PySpark and pass the data to TensorFlow for the 
ML part. There are 2 supported modes here:
1) Simultaneous multi-model training (a.k.a. embarrassingly parallel: each 
node has the entire data and model):
https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html

2) Data parallelism (data is distributed, each node has the entire model): 

There are some prototypes out there and TensorSpark seems to be most 
mature: https://github.com/adatao/tensorspark
It implements Downpour/Asynchronous SGD for the distributed training; it 
remains to be stress-tested with large datasets, however.
More info: 
https://arimo.com/machine-learning/deep-learning/2016/arimo-distributed-tensorflow-on-spark/

TensorFrames does not allow distributed training and I did not see any 
performance benchmarks last time I checked.

Alexander Ulanov of HP made a presentation of the options few months ago:
https://www.oreilly.com/learning/distributed-deep-learning-on-spark

Masood


--
Masood Krohy, Ph.D.
Data Scientist, Intact Lab-R&D
Intact Financial Corporation




De :Benjamin Kim 
A : janardhan shetty 
Cc :Gourav Sengupta , user 

Date :  2016-11-01 13:14
Objet : Re: Deep learning libraries for scala



To add, I see that Databricks has been busy integrating deep learning more 
into their product and put out a new article about this.

https://databricks.com/blog/2016/10/27/gpu-acceleration-in-databricks.html

An interesting tidbit is at the bottom of the article mentioning 
TensorFrames.

https://github.com/databricks/tensorframes

Seems like an interesting direction…

Cheers,
Ben


On Oct 19, 2016, at 9:05 AM, janardhan shetty  
wrote:

Agreed. But as it states deeper integration with (scala) is yet to be 
developed. 
Any thoughts on how to use tensorflow with scala ? Need to write wrappers 
I think. 

On Oct 19, 2016 7:56 AM, "Benjamin Kim"  wrote:
On that note, here is an article that Databricks made regarding using 
Tensorflow in conjunction with Spark.

https://databricks.com/blog/2016/01/25/deep-learning-with-apache-spark-and-tensorflow.html

Cheers,
Ben


On Oct 19, 2016, at 3:09 AM, Gourav Sengupta  
wrote:

while using Deep Learning you might want to stay as close to tensorflow as 
possible. There is very less translation loss, you get to access stable, 
scalable and tested libraries from the best brains in the industry and as 
far as Scala goes, it helps a lot to think about using the language as a 
tool to access algorithms in this instance unless you want to start 
developing algorithms from grounds up ( and in which case you might not 
require any libraries at all).

On Sat, Oct 1, 2016 at 3:30 AM, janardhan shetty  
wrote:
Hi,

Are there any good libraries which can be used for scala deep learning 
models ?
How can we integrate tensorflow with scala ML ?







Re: expected behavior of Kafka dynamic topic subscription

2016-11-04 Thread Cody Koeninger
That's not what I would expect from the underlying kafka consumer, no.

But this particular case (no matching topics, then add a topic after
SubscribePattern stream starts) actually isn't part of unit tests for
either the DStream or the structured stream.

I'll make a jira ticket.

On Thu, Nov 3, 2016 at 9:43 PM, Haopu Wang  wrote:
> I'm using Kafka010 integration API to create a DStream using
> SubscriberPattern ConsumerStrategy.
>
> The specified topic doesn't exist when I start the application.
>
> Then I create the topic and publish some test messages. I can see them in
> the console subscriber.
>
> But the spark application doesn't seem to get the messages.
>
> I think this is not expected, right? What should I check to resolve it?
>
> Thank you very much!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Koert Kuipers
i just noticed Sort for Dataset has a global flag. and Dataset also has
sortWithinPartitions.

how about:
repartition + sortWithinPartitions + mapPartitions?

the plan looks ok, but it is not clear to me if the sort is done as part of
the shuffle (which is the important optimization).

scala> val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key",
"value")

scala> df.repartition(2,
col("key")).sortWithinPartitions("value").as[(String,
String)].mapPartitions{ (x: Iterator[(String, String)]) => x }.explain
== Physical Plan ==
*SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
object)._1, true) AS _1#39, staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple2, true], top level non-flat input
object)._2, true) AS _2#40]
+- MapPartitions , obj#38: scala.Tuple2
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#37:
scala.Tuple2
  +- *Sort [value#6 ASC], false, 0
 +- Exchange hashpartitioning(key#5, 2)
+- LocalTableScan [key#5, value#6]




On Fri, Nov 4, 2016 at 9:18 AM, Koert Kuipers  wrote:

> sure, but then my values are not sorted per key, right?
>
> so a group by key with values sorted according to to some ordering is an
> operation that can be done efficiently in a single shuffle without first
> figuring out range boundaries. and it is needed for quite a few algos,
> including Window and lots of timeseries stuff. but it seems there is no way
> to express i want to do this yet (at least not in an efficient way).
>
> which makes me wonder, what does Window do?
>
>
> On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust 
> wrote:
>
>> Thinking out loud is good :)
>>
>> You are right in that anytime you ask for a global ordering from Spark
>> you will pay the cost of figuring out the range boundaries for partitions.
>> If you say orderBy, though, we aren't sure that you aren't expecting a
>> global order.
>>
>> If you only want to make sure that items are colocated, it is cheaper to
>> do a groupByKey followed by a flatMapGroups
>> 
>> .
>>
>>
>>
>> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers  wrote:
>>
>>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
>>> then do mapPartitions?
>>>
>>> sorry thinking out loud a bit here. ok i think that could work. thanks
>>>
>>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers 
>>> wrote:
>>>
 thats an interesting thought about orderBy and mapPartitions. i guess i
 could emulate a groupBy with secondary sort using those two. however isn't
 using an orderBy expensive since it is a total sort? i mean a groupBy with
 secondary sort is also a total sort under the hood, but its on
 (hashCode(key), secondarySortColumn) which is easier to distribute and
 therefore can be implemented more efficiently.





 On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> It is still unclear to me why we should remember all these tricks (or
>> add lots of extra little functions) when this elegantly can be expressed 
>> in
>> a reduce operation with a simple one line lamba function.
>>
> I think you can do that too.  KeyValueGroupedDataset has a
> reduceGroups function.  This probably won't be as fast though because you
> end up creating objects where as the version I gave will get codgened to
> operate on binary data the whole way though.
>
>> The same applies to these Window functions. I had to read it 3 times
>> to understand what it all means. Maybe it makes sense for someone who has
>> been forced to use such limited tools in sql for many years but that's 
>> not
>> necessary what we should aim for. Why can I not just have the sortBy and
>> then an Iterator[X] => Iterator[Y] to express what I want to do?
>>
> We also have orderBy and mapPartitions.
>
>> All these functions (rank etc.) can be trivially expressed in this,
>> plus I can add other operations if needed, instead of being locked in 
>> like
>> this Window framework.
>>
>  I agree that window functions would probably not be my first choice
> for many problems, but for people coming from SQL it was a very popular
> feature.  My real goal is to give as many paradigms as possible in a 
> single
> unified framework.  Let people pick the right mode of expression for any
> given job :)
>


>>>
>>
>


Re: How do I convert a data frame to broadcast variable?

2016-11-04 Thread Jain, Nishit
Awesome, thanks Silvio!

From: Silvio Fiorito 
mailto:silvio.fior...@granturing.com>>
Date: Thursday, November 3, 2016 at 12:26 PM
To: "Jain, Nishit" mailto:nja...@underarmour.com>>, 
Denny Lee mailto:denny.g@gmail.com>>, 
"user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: How do I convert a data frame to broadcast variable?


Hi Nishit,


Yes the JDBC connector supports predicate pushdown and column pruning. So any 
selection you make on the dataframe will get materialized in the query sent via 
JDBC.


You should be able to verify this by looking at the physical query plan:


val df = sqlContext.jdbc().select($"col1", $"col2")

df.explain(true)


Or if you can easily log queries submitted to your database then you can view 
the specific query.


Thanks,

Silvio


From: Jain, Nishit mailto:nja...@underarmour.com>>
Sent: Thursday, November 3, 2016 12:32:48 PM
To: Denny Lee; user@spark.apache.org
Subject: Re: How do I convert a data frame to broadcast variable?

Thanks Denny! That does help. I will give that a shot.

Question: If I am going this route, I am wondering how can I only read few 
columns of a table (not whole table) from JDBC as data frame.
This function from data frame reader does not give an option to read only 
certain columns:
def jdbc(url: String, table: String, predicates: Array[String], 
connectionProperties: Properties): DataFrame

On the other hand if I want to create a JDBCRdd I can specify a select query 
(instead of full table):
new JdbcRDD(sc: SparkContext, getConnection: () ⇒ Connection, sql: String, 
lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: (ResultSet) ⇒ T 
= JdbcRDD.resultSetToObjectArray)(implicit arg0: ClassTag[T])

May be if I do df.select(col1, col2)  on data frame created via a table, will 
spark be smart enough to fetch only two columns not entire table?
Any way to test this?


From: Denny Lee mailto:denny.g@gmail.com>>
Date: Thursday, November 3, 2016 at 10:59 AM
To: "Jain, Nishit" mailto:nja...@underarmour.com>>, 
"user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: How do I convert a data frame to broadcast variable?

If you're able to read the data in as a DataFrame, perhaps you can use a 
BroadcastHashJoin so that way you can join to that table presuming its small 
enough to distributed?  Here's a handy guide on a BroadcastHashJoin: 
https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/05%20BroadcastHashJoin%20-%20scala.html

HTH!


On Thu, Nov 3, 2016 at 8:53 AM Jain, Nishit 
mailto:nja...@underarmour.com>> wrote:
I have a lookup table in HANA database. I want to create a spark broadcast 
variable for it.
What would be the suggested approach? Should I read it as an data frame and 
convert data frame into broadcast variable?

Thanks,
Nishit


Re: sanboxing spark executors

2016-11-04 Thread Andrew Holway
I think running it on a Mesos cluster could give you better control over
this kinda stuff.


On Fri, Nov 4, 2016 at 7:41 AM, blazespinnaker 
wrote:

> Is there a good method / discussion / documentation on how to sandbox a
> spark
> executor?   Assume the code is untrusted and you don't want it to be able
> to
> make un validated network connections or do unvalidated alluxio/hdfs/file
> io.
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/sanboxing-spark-executors-tp28014.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Otter Networks UG
http://otternetworks.de
Gotenstraße 17
10829 Berlin


Re: Aggregation Calculation

2016-11-04 Thread Andrés Ivaldi
Ok, so I've read that rollup is just syntactic sugar of GROUPING SET(...),
in that case I just need to use GROUPNG SET, but the examples in the
documentation this GROUPING SET is used with SQL syntaxis and I am doing it
programmatically, so I need the DataSet api, like ds.rollup(..) but for
grouping set,

Does any one knows how to do it?

thanks.



On Thu, Nov 3, 2016 at 5:17 PM, Andrés Ivaldi  wrote:

> I'm not sure about inline views, it will still performing aggregation that
> I don't need. I think I didn't explain right, I've already filtered the
> values that I need, the problem is that default calculation of rollUp give
> me some calculations that I don't want like only aggregation by the second
> column.
> Suppose tree columns (DataSet Columns) Year, Moth, Import, and I want
> aggregation sum(Import), and the combination of all Year/Month Sum(import),
> also Year Sum(import), but Mont Sum(import) doesn't care
>
> in table it will looks like
>
> YEAR | MOTH | Sum(Import)
> 2006 | 1| 
> 2005 | 1| 
> 2005 | 2| 
> 2006 | null | 
> 2005 | null | 
> null | null | 
> null | 1| 
> null | 2| 
>
> the las tree rows are not needed, in this example I could perform
> filtering after rollUp i do the query by demand  so it will grow depending
> on number of rows and columns, and will be a lot of combinations that I
> don't need.
>
> thanks
>
>
>
>
>
> On Thu, Nov 3, 2016 at 4:04 PM, Stephen Boesch  wrote:
>
>> You would likely want to create inline views that perform the filtering 
>> *before
>> *performing t he cubes/rollup; in this way the cubes/rollups only
>> operate on the pruned rows/columns.
>>
>> 2016-11-03 11:29 GMT-07:00 Andrés Ivaldi :
>>
>>> Hello, I need to perform some aggregations and a kind of Cube/RollUp
>>> calculation
>>>
>>> Doing some test looks like Cube and RollUp performs aggregation over all
>>> posible columns combination, but I just need some specific columns
>>> combination.
>>>
>>> What I'm trying to do is like a dataTable where te first N columns are
>>> may rows and the second M values are my columns and the last columna are
>>> the aggregated values, like Dimension / Measures
>>>
>>> I need all the values of the N and M columns and the ones that
>>> correspond to the aggregation function. I'll never need the values that
>>> previous column has no value, ie
>>>
>>> having N=2 so two columns as rows I'll need
>>> R1 | R2  
>>> ##  |  ## 
>>> ##  |   null 
>>>
>>> but not
>>> null | ## 
>>>
>>> as roll up does, same approach to M columns
>>>
>>>
>>> So the question is what could be the better way to perform this
>>> calculation.
>>> Using rollUp/Cube give me a lot of values that I dont need
>>> Using groupBy give me less information ( I could do several groupBy but
>>> that is not performant, I think )
>>> Is any other way to something like that?
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Ing. Ivaldi Andres
>>>
>>
>>
>
>
> --
> Ing. Ivaldi Andres
>



-- 
Ing. Ivaldi Andres


Re: Confusion SparkSQL DataFrame OrderBy followed by GroupBY

2016-11-04 Thread Koert Kuipers
sure, but then my values are not sorted per key, right?

so a group by key with values sorted according to to some ordering is an
operation that can be done efficiently in a single shuffle without first
figuring out range boundaries. and it is needed for quite a few algos,
including Window and lots of timeseries stuff. but it seems there is no way
to express i want to do this yet (at least not in an efficient way).

which makes me wonder, what does Window do?


On Fri, Nov 4, 2016 at 12:59 AM, Michael Armbrust 
wrote:

> Thinking out loud is good :)
>
> You are right in that anytime you ask for a global ordering from Spark you
> will pay the cost of figuring out the range boundaries for partitions.  If
> you say orderBy, though, we aren't sure that you aren't expecting a
> global order.
>
> If you only want to make sure that items are colocated, it is cheaper to
> do a groupByKey followed by a flatMapGroups
> 
> .
>
>
>
> On Thu, Nov 3, 2016 at 7:31 PM, Koert Kuipers  wrote:
>
>> i guess i could sort by (hashcode(key), key, secondarySortColumn) and
>> then do mapPartitions?
>>
>> sorry thinking out loud a bit here. ok i think that could work. thanks
>>
>> On Thu, Nov 3, 2016 at 10:25 PM, Koert Kuipers  wrote:
>>
>>> thats an interesting thought about orderBy and mapPartitions. i guess i
>>> could emulate a groupBy with secondary sort using those two. however isn't
>>> using an orderBy expensive since it is a total sort? i mean a groupBy with
>>> secondary sort is also a total sort under the hood, but its on
>>> (hashCode(key), secondarySortColumn) which is easier to distribute and
>>> therefore can be implemented more efficiently.
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 8:59 PM, Michael Armbrust >> > wrote:
>>>
 It is still unclear to me why we should remember all these tricks (or
> add lots of extra little functions) when this elegantly can be expressed 
> in
> a reduce operation with a simple one line lamba function.
>
 I think you can do that too.  KeyValueGroupedDataset has a
 reduceGroups function.  This probably won't be as fast though because you
 end up creating objects where as the version I gave will get codgened to
 operate on binary data the whole way though.

> The same applies to these Window functions. I had to read it 3 times
> to understand what it all means. Maybe it makes sense for someone who has
> been forced to use such limited tools in sql for many years but that's not
> necessary what we should aim for. Why can I not just have the sortBy and
> then an Iterator[X] => Iterator[Y] to express what I want to do?
>
 We also have orderBy and mapPartitions.

> All these functions (rank etc.) can be trivially expressed in this,
> plus I can add other operations if needed, instead of being locked in like
> this Window framework.
>
  I agree that window functions would probably not be my first choice
 for many problems, but for people coming from SQL it was a very popular
 feature.  My real goal is to give as many paradigms as possible in a single
 unified framework.  Let people pick the right mode of expression for any
 given job :)

>>>
>>>
>>
>


WARN 1 block locks were not released with MLlib ALS

2016-11-04 Thread Mikael Ståldal
I get a few warnings like this in Spark 2.0.1 when using org
.apache.spark.mllib.recommendation.ALS:

WARN  org.apache.spark.executor.Executor - 1 block locks were not released
by TID = 1448:
[rdd_239_0]

What can be the reason for that?


-- 
[image: MagineTV]

*Mikael Ståldal*
Senior software developer

*Magine TV*
mikael.stal...@magine.com
Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com

Privileged and/or Confidential Information may be contained in this
message. If you are not the addressee indicated in this message
(or responsible for delivery of the message to such a person), you may not
copy or deliver this message to anyone. In such case,
you should destroy this message and kindly notify the sender by reply
email.


Re: example LDA code ClassCastException

2016-11-04 Thread Tamas Jambor
thanks for the reply.

Asher, have you experienced problem when checkpoints are not enabled as
well? If we have large number of iterations (over 150) and checkpoints are
not enabled, the process just hangs (without no error) at around iteration
120-140 (on spark 2.0.0). I could not reproduce this outside of our data,
unfortunately.

On Fri, Nov 4, 2016 at 2:53 AM, Asher Krim  wrote:

> There is an open Jira for this issue (https://issues.apache.org/
> jira/browse/SPARK-14804). There have been a few proposed fixes so far.
>
> On Thu, Nov 3, 2016 at 2:20 PM, jamborta  wrote:
>
>> Hi there,
>>
>> I am trying to run the example LDA code
>> (http://spark.apache.org/docs/latest/mllib-clustering.html#l
>> atent-dirichlet-allocation-lda)
>> on Spark 2.0.0/EMR 5.0.0
>>
>> If run it with checkpoints enabled (sc.setCheckpointDir("s3n://s3-path/")
>>
>> ldaModel = LDA.train(corpus, k=3, maxIterations=200,
>> checkpointInterval=10)
>>
>> I get the following error (sorry, quite long):
>>
>> Py4JJavaErrorTraceback (most recent call last)
>>  in ()
>> > 1 ldaModel = LDA.train(corpus, k=3, maxIterations=200,
>> checkpointInterval=10)
>>
>> /usr/lib/spark/python/pyspark/mllib/clustering.py in train(cls, rdd, k,
>> maxIterations, docConcentration, topicConcentration, seed,
>> checkpointInterval, optimizer)
>>1037 model = callMLlibFunc("trainLDAModel", rdd, k,
>> maxIterations,
>>1038   docConcentration,
>> topicConcentration,
>> seed,
>> -> 1039   checkpointInterval, optimizer)
>>1040 return LDAModel(model)
>>1041
>>
>> /usr/lib/spark/python/pyspark/mllib/common.py in callMLlibFunc(name,
>> *args)
>> 128 sc = SparkContext.getOrCreate()
>> 129 api = getattr(sc._jvm.PythonMLLibAPI(), name)
>> --> 130 return callJavaFunc(sc, api, *args)
>> 131
>> 132
>>
>> /usr/lib/spark/python/pyspark/mllib/common.py in callJavaFunc(sc, func,
>> *args)
>> 121 """ Call Java Function """
>> 122 args = [_py2java(sc, a) for a in args]
>> --> 123 return _java2py(sc, func(*args))
>> 124
>> 125
>>
>> /usr/lib/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in
>> __call__(self, *args)
>> 931 answer = self.gateway_client.send_command(command)
>> 932 return_value = get_return_value(
>> --> 933 answer, self.gateway_client, self.target_id,
>> self.name)
>> 934
>> 935 for temp_arg in temp_args:
>>
>> /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
>>  61 def deco(*a, **kw):
>>  62 try:
>> ---> 63 return f(*a, **kw)
>>  64 except py4j.protocol.Py4JJavaError as e:
>>  65 s = e.java_exception.toString()
>>
>> /usr/lib/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in
>> get_return_value(answer, gateway_client, target_id, name)
>> 310 raise Py4JJavaError(
>> 311 "An error occurred while calling
>> {0}{1}{2}.\n".
>> --> 312 format(target_id, ".", name), value)
>> 313 else:
>> 314 raise Py4JError(
>>
>> Py4JJavaError: An error occurred while calling o115.trainLDAModel.
>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>> Task 1
>> in stage 458.0 failed 4 times, most recent failure: Lost task 1.3 in stage
>> 458.0 (TID 14827, ip-10-197-192-2.eu-west-1.compute.internal):
>> java.lang.ClassCastException: scala.Tuple2 cannot be cast to
>> org.apache.spark.graphx.Edge
>> at
>> org.apache.spark.graphx.EdgeRDD$$anonfun$1$$anonfun$apply$1.
>> apply(EdgeRDD.scala:107)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> at
>> org.apache.spark.InterruptibleIterator.foreach(Interruptible
>> Iterator.scala:28)
>> at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.
>> scala:107)
>> at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.
>> scala:105)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$
>> anonfun$apply$25.apply(RDD.scala:801)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$
>> anonfun$apply$25.apply(RDD.scala:801)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>> 319)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>> 319)
>> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
>> at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
>> at
>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator
>> $1.apply(BlockManager.scala:919)
>> at
>> org.apache.spark.storage.BlockManager$$anonfun$doP

Re: Delegation Token renewal in yarn-cluster

2016-11-04 Thread Steve Loughran

On 4 Nov 2016, at 01:37, Marcelo Vanzin 
mailto:van...@cloudera.com>> wrote:

On Thu, Nov 3, 2016 at 3:47 PM, Zsolt Tóth 
mailto:toth.zsolt@gmail.com>> wrote:
What is the purpose of the delegation token renewal (the one that is done
automatically by Hadoop libraries, after 1 day by default)? It seems that it
always happens (every day) until the token expires, no matter what. I'd
probably find an answer to that in a basic Hadoop security description.



* DTs allow a long lived job to outlast the Kerberos ticket lifetime of the 
submitter; usually 48-72h.
* submitting jobs with DTs limit the access of the job to those services for 
which you have a DT; no need to acquire Kerberos tickets for every query being 
run. This keeps load on kerberos down, which is good as with Active Directory 
that's usually shared with the rest of the organisation. Some kerberos servers 
treat a bulk access from a few thousand machines as a brute force attack.
* Delegation tokens can also be revoked at the NN. After a process terminates, 
something (YARN NM?) can chat with the NN and say "no longer valid". In 
contrast, Kerberos TGTs stay valid until that timeout, without any revocation 
mechanism.

I'm not sure and I never really got a good answer to that (I had the
same question in the past). My best guess is to limit how long an
attacker can do bad things if he gets hold of a delegation token. But
IMO if an attacker gets a delegation token, that's pretty bad
regardless of how long he can use it...


correct: limits the damage. In contrast, if someone has your keytab, they have 
access until that KT expires.




I have a feeling that giving the keytab to Spark bypasses the concept behind
delegation tokens. As I understand, the NN basically says that "your
application can access hdfs with this delegation token, but only for 7
days".

I'm not sure why there's a 7 day limit either, but let's assume
there's a good reason. Basically the app, at that point, needs to
prove to the NN it has a valid kerberos credential. Whether that's
from someone typing their password into a terminal, or code using a
keytab, it doesn't really matter. If someone was worried about that
user being malicious they'd disable the user's login in the KDC.

This feature is needed because there are apps that need to keep
running, unattended, for longer than HDFS's max lifetime setting.


pretty much it. FWIW that's why turning Kerberos on midweek morning, rather 
than a friday evening, is wise. The 7 day timeout event will start happening 
during working hours.

https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md



Re: sanboxing spark executors

2016-11-04 Thread Steve Loughran

> On 4 Nov 2016, at 06:41, blazespinnaker  wrote:
> 
> Is there a good method / discussion / documentation on how to sandbox a spark
> executor?   Assume the code is untrusted and you don't want it to be able to
> make un validated network connections or do unvalidated alluxio/hdfs/file


use Kerberos to auth HDFS connections, HBase, Hive. When enabled spark 
processes (all yarn processes) will run as different users in the cluster for 
isolation there too.

no easy way to monitor/block general outbound network connections though.  

> io.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/sanboxing-spark-executors-tp28014.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-04 Thread Carlo . Allocca
Hi Robin,

On 4 Nov 2016, at 09:19, Robin East 
mailto:robin.e...@xense.co.uk>> wrote:

Hi

Do you mean the test of significance that you usually get with R output?
Yes, exactly.

I don’t think there is anything implemented in the standard MLLib libraries 
however I believe that the sparkR version provides that. See 
http://spark.apache.org/docs/1.6.2/sparkr.html#gaussian-glm-model

Glad to hear that as it means that I m not missing much.

Many Thanks.

Best Regards,
Carlo

---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action





On 4 Nov 2016, at 07:38, Carlo.Allocca 
mailto:carlo.allo...@open.ac.uk>> wrote:

Hi Mohit,

Thank you for your reply.
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


On 3 Nov 2016, at 20:41, Mohit Jaggi 
mailto:mohitja...@gmail.com>> wrote:

For linear regression, it should be fairly easy. Just sort the co-efficients :)

Mohit Jaggi
Founder,
Data Orchard LLC
www.dataorchardllc.com




On Nov 3, 2016, at 3:35 AM, Carlo.Allocca 
mailto:carlo.allo...@open.ac.uk>> wrote:

Hi All,

I am using SPARK and in particular the MLib library.

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.regression.LinearRegressionModel;
import org.apache.spark.mllib.regression.LinearRegressionWithSGD;

For my problem I am using the LinearRegressionWithSGD and I would like to 
perform a “Rank Features By Importance”.

I checked the documentation and it seems that does not provide such methods.

Am I missing anything?  Please, could you provide any help on this?
Should I change the approach?

Many Thanks in advance,

Best Regards,
Carlo


-- The Open University is incorporated by Royal Charter (RC 000391), an exempt 
charity in England & Wales and a charity registered in Scotland (SC 038302). 
The Open University is authorised and regulated by the Financial Conduct 
Authority.

-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org





Re: Is Spark launcher's listener API considered production ready?

2016-11-04 Thread Aseem Bansal
Anyone has any idea about this?

On Thu, Nov 3, 2016 at 12:52 PM, Aseem Bansal  wrote:

> While using Spark launcher's listener we came across few cases where the
> failures were not being reported correctly.
>
>
>- https://issues.apache.org/jira/browse/SPARK-17742
>- https://issues.apache.org/jira/browse/SPARK-18241
>
> So just wanted to ensure whether this API considered production ready and
> has anyone used it is production successfully? Has anyone used it? It seems
> that we need to handle the failures ourselves. How did you handle the
> failure cases?
>


InvalidClassException when load KafkaDirectStream from checkpoint (Spark 2.0.0)

2016-11-04 Thread Haopu Wang
When I load spark checkpoint, I get below error. Do you have any idea? 

Much thanks!

 

*

 

2016-11-04 17:12:19,582 INFO
[org.apache.spark.streaming.CheckpointReader] (main;) Checkpoint files
found:
file:/d:/temp/checkpoint/checkpoint-147825070,file:/d:/temp/checkpoi
nt/checkpoint-147825070.bk,file:/d:/temp/checkpoint/checkpoint-14782
5069,file:/d:/temp/checkpoint/checkpoint-147825069.bk,file:/d:/t
emp/checkpoint/checkpoint-147825068,file:/d:/temp/checkpoint/checkpo
int-147825068.bk,file:/d:/temp/checkpoint/checkpoint-147825067,f
ile:/d:/temp/checkpoint/checkpoint-147825067.bk

2016-11-04 17:12:19,584 INFO
[org.apache.spark.streaming.CheckpointReader] (main;) Attempting to load
checkpoint from file file:/d:/temp/checkpoint/checkpoint-147825070

2016-11-04 17:12:19,640 DEBUG [org.apache.spark.streaming.DStreamGraph]
(main;) DStreamGraph.readObject used

2016-11-04 17:12:19,661 DEBUG
[org.apache.spark.streaming.kafka010.DirectKafkaInputDStream] (main;)
DirectKafkaInputDStream.readObject used

2016-11-04 17:12:19,664 DEBUG
[org.apache.spark.streaming.dstream.DStreamCheckpointData] (main;)
DStreamCheckpointData.readObject used

2016-11-04 17:12:19,679 DEBUG
[org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafka
InputDStreamCheckpointData] (main;)
DirectKafkaInputDStreamCheckpointData.readObject used

2016-11-04 17:12:19,685 ERROR [org.apache.spark.util.Utils] (main;)
Exception encountered

java.io.InvalidClassException:
scala.collection.convert.Wrappers$MutableMapWrapper; no valid
constructor

 at
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectS
treamClass.java:150)

 at
java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1772
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)

 at
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$
sp(DStreamGraph.scala:193)

 at
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply(DStr
eamGraph.scala:189)

 at
org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply(DStr
eamGraph.scala:189)

 at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)

 at
org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:18
9)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
)

 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

 at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

 at
org.apache.spark.streaming.Checkpoint$$anonfun$deserialize$2.apply(Check
point.scala:164)

 at
org.apache.spark.st

Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-04 Thread Robin East
Hi 

Do you mean the test of significance that you usually get with R output? I 
don’t think there is anything implemented in the standard MLLib libraries 
however I believe that the sparkR version provides that. See 
http://spark.apache.org/docs/1.6.2/sparkr.html#gaussian-glm-model

---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 4 Nov 2016, at 07:38, Carlo.Allocca  wrote:
> 
> Hi Mohit, 
> 
> Thank you for your reply. 
> OK. it means coefficient with high score are more important that other with 
> low score…
> 
> Many Thanks,
> Best Regards,
> Carlo
> 
> 
>> On 3 Nov 2016, at 20:41, Mohit Jaggi  wrote:
>> 
>> For linear regression, it should be fairly easy. Just sort the co-efficients 
>> :)
>> 
>> Mohit Jaggi
>> Founder,
>> Data Orchard LLC
>> www.dataorchardllc.com
>> 
>> 
>> 
>> 
>>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca  wrote:
>>> 
>>> Hi All,
>>> 
>>> I am using SPARK and in particular the MLib library.
>>> 
>>> import org.apache.spark.mllib.regression.LabeledPoint;
>>> import org.apache.spark.mllib.regression.LinearRegressionModel;
>>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>>> 
>>> For my problem I am using the LinearRegressionWithSGD and I would like to 
>>> perform a “Rank Features By Importance”.
>>> 
>>> I checked the documentation and it seems that does not provide such methods.
>>> 
>>> Am I missing anything?  Please, could you provide any help on this?
>>> Should I change the approach?
>>> 
>>> Many Thanks in advance,
>>> 
>>> Best Regards,
>>> Carlo
>>> 
>>> 
>>> -- The Open University is incorporated by Royal Charter (RC 000391), an 
>>> exempt charity in England & Wales and a charity registered in Scotland (SC 
>>> 038302). The Open University is authorised and regulated by the Financial 
>>> Conduct Authority.
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
>> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: Delegation Token renewal in yarn-cluster

2016-11-04 Thread Zsolt Tóth
I checked the logs of my tests, and found that the Spark schedules the
token refresh based on the renew-interval property, not the max-lifetime.

The settings in my tests:
dfs.namenode.delegation.key.update-interval=52
dfs.namenode.delegation.token.max-lifetime=102
dfs.namenode.delegation.token.renew-interval=52

During the job submission, spark.yarn.token.renewal.interval is set:
2016-11-04 09:12:25 INFO  Client:59 - Renewal Interval set to 520036

Then, it takes ~0.75*spark.yarn.token.renewal.interval to schedule the
token refresh.

2016-11-04 09:12:37 INFO  ExecutorDelegationTokenUpdater:59 - Scheduling
token refresh from HDFS in 404251 millis.
...
2016-11-04 09:19:21 INFO  ExecutorDelegationTokenUpdater:59 - Reading new
delegation tokens from ...
...
2016-11-04 09:19:21 INFO  ExecutorDelegationTokenUpdater:59 - Scheduling
token refresh from HDFS in 390064 millis.
...
2016-11-04 09:25:52 INFO  ExecutorDelegationTokenUpdater:59 - Reading new
delegation tokens from ...
...
2016-11-04 09:25:52 INFO  ExecutorDelegationTokenUpdater:59 - Scheduling
token refresh from HDFS in 390022 millis.

This was what confused me in the first place. Why does Spark ask for new
tokens based on the renew-interval instead of the max-lifetime?


2016-11-04 2:37 GMT+01:00 Marcelo Vanzin :

> On Thu, Nov 3, 2016 at 3:47 PM, Zsolt Tóth 
> wrote:
> > What is the purpose of the delegation token renewal (the one that is done
> > automatically by Hadoop libraries, after 1 day by default)? It seems
> that it
> > always happens (every day) until the token expires, no matter what. I'd
> > probably find an answer to that in a basic Hadoop security description.
>
> I'm not sure and I never really got a good answer to that (I had the
> same question in the past). My best guess is to limit how long an
> attacker can do bad things if he gets hold of a delegation token. But
> IMO if an attacker gets a delegation token, that's pretty bad
> regardless of how long he can use it...
>
> > I have a feeling that giving the keytab to Spark bypasses the concept
> behind
> > delegation tokens. As I understand, the NN basically says that "your
> > application can access hdfs with this delegation token, but only for 7
> > days".
>
> I'm not sure why there's a 7 day limit either, but let's assume
> there's a good reason. Basically the app, at that point, needs to
> prove to the NN it has a valid kerberos credential. Whether that's
> from someone typing their password into a terminal, or code using a
> keytab, it doesn't really matter. If someone was worried about that
> user being malicious they'd disable the user's login in the KDC.
>
> This feature is needed because there are apps that need to keep
> running, unattended, for longer than HDFS's max lifetime setting.
>
> --
> Marcelo
>


Vector is not found in case class after import

2016-11-04 Thread Yan Facai
 Hi,
My spark-shell version is 2.0.1.

I import the Vector and hope to use it in case class, while spark-shell
throws an error: not found.

scala> import org.apache.spark.ml.linalg.{Vector => OldVector}
import org.apache.spark.ml.linalg.{Vector=>OldVector}

scala> case class Movie(vec: OldVector)
:11: error: not found: type OldVector
   case class Movie(vec: OldVector)
 ^

Is it a bug?


Re: LinearRegressionWithSGD and Rank Features By Importance

2016-11-04 Thread Carlo . Allocca
Hi Mohit, 

Thank you for your reply. 
OK. it means coefficient with high score are more important that other with low 
score…

Many Thanks,
Best Regards,
Carlo


> On 3 Nov 2016, at 20:41, Mohit Jaggi  wrote:
> 
> For linear regression, it should be fairly easy. Just sort the co-efficients 
> :)
> 
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
> 
> 
> 
> 
>> On Nov 3, 2016, at 3:35 AM, Carlo.Allocca  wrote:
>> 
>> Hi All,
>> 
>> I am using SPARK and in particular the MLib library.
>> 
>> import org.apache.spark.mllib.regression.LabeledPoint;
>> import org.apache.spark.mllib.regression.LinearRegressionModel;
>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD;
>> 
>> For my problem I am using the LinearRegressionWithSGD and I would like to 
>> perform a “Rank Features By Importance”.
>> 
>> I checked the documentation and it seems that does not provide such methods.
>> 
>> Am I missing anything?  Please, could you provide any help on this?
>> Should I change the approach?
>> 
>> Many Thanks in advance,
>> 
>> Best Regards,
>> Carlo
>> 
>> 
>> -- The Open University is incorporated by Royal Charter (RC 000391), an 
>> exempt charity in England & Wales and a charity registered in Scotland (SC 
>> 038302). The Open University is authorised and regulated by the Financial 
>> Conduct Authority.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org