Re: MLlib mission and goals

2017-01-31 Thread Seth Hendrickson
I agree with what Sean said about not supporting arbitrarily many
algorithms. I think the goal of MLlib should be to support only core
algorithms for machine learning. Ideally Spark ML provides a relatively
small set of algorithms that are heavily optimized, and also provides a
framework that makes it easy for users to extend and build their own
packages and algos when they need to. Spark ML is already quite good for
this. We have of course been doing a lot of work migrating to this new API,
and now that we are approaching full parity, it would be good to shift the
focus to performance as others have noted. Supporting a few algorithms that
perform very well is significantly better than supporting many algorithms
with moderate performance, IMO.

I also think a more complete, optimized distributed linear algebra library
would be a great asset, but it may be a more long term goal. A performance
framework for regression testing would be great, but keeping it up to date
is difficult.

Thanks for kicking this thread off Joseph!

On Tue, Jan 24, 2017 at 7:30 PM, Joseph Bradley 
wrote:

> *Re: performance measurement framework*
> We (Databricks) used to use spark-perf
> , but that was mainly for the
> RDD-based API.  We've now switched to spark-sql-perf
> , which does include some
> ML benchmarks despite the project name.  I'll see about updating the
> project README to document how to run MLlib tests.
>
>
> On Tue, Jan 24, 2017 at 6:02 PM, bradc  wrote:
>
>> I believe one of the higher level goals of Spark MLlib should be to
>> improve the efficiency of the ML algorithms that already exist. Currently
>> there ML has a reasonable coverage of the important core algorithms. The
>> work to get to feature parity for DataFrame-based API and model persistence
>> are also important.
>>
>> Apache Spark needs to use higher-level BLAS3 and LAPACK routines, instead
>> of BLAS1 & BLAS3. For a long time we've used the concept of compute
>> intensity (compute_intensity = FP_operations/Word) to help look at the
>> performance of the underling compute kernels (see the papers referenced
>> below). It has been proven in many implementations that performance,
>> scalability, and huge reduction in memory pressure can be achieved by using
>> higher-level BLAS3 or LAPACK routines in both single node as well as
>> distributed computations.
>>
>> I performed a survey of some of Apache Spark's ML algorithms.
>> Unfortunately most of the ML algorithms are implemented with BLAS1 or BLAS2
>> routines which have very low compute intensity. BLAS2 and BLAS1 routines
>> require a lot more memory bandwidth and will not achieve peak performance
>> on x86, GPUs, or any other processor.
>>
>> Apache Spark 2.1.0 ML routines & BLAS Routines
>>
>> ALS(Alternating Least Squares matrix factorization
>>
>>- BLAS2: _SPR, _TPSV
>>- BLAS1: _AXPY, _DOT, _SCAL, _NRM2
>>
>> Logistic regression classification
>>
>>- BLAS2: _GEMV
>>- BLAS1: _DOT, _SCAL
>>
>> Generalized linear regression
>>
>>- BLAS1: _DOT
>>
>> Gradient-boosted tree regression
>>
>>- BLAS1: _DOT
>>
>> GraphX SVD++
>>
>>- BLAS1: _AXPY, _DOT,_SCAL
>>
>> Neural Net Multi-layer Perceptron
>>
>>- BLAS3: _GEMM
>>- BLAS2: _GEMV
>>
>> Only the Neural Net Multi-layer Perceptron uses BLAS3 matrix multiply
>> (DGEMM). BTW the underscores are replaced by S, D, Z, C for (32-bit real,
>> 64-bit double, 32-bit complex, 64-bit complex operations; respectably).
>>
>> Refactoring the algorithms to use BLAS3 routines or higher level LAPACK
>> routines will require coding changes to use sub-block algorithms but the
>> performance benefits can be great.
>>
>> More at: https://blogs.oracle.com/BestPerf/entry/improving_algorithms
>> _in_spark_ml
>> Background:
>>
>> Brad Carlile. Parallelism, compute intensity, and data vectorization.
>> SuperComputing'93, November 1993.
>> 
>>
>> John McCalpin. 
>> 213876927_Memory_Bandwidth_and_Machine_Balance_in_Current_High_Performance_Computers
>> 1995
>> 
>>
>> --
>> View this message in context: Re: MLlib mission and goals
>> 
>> Sent from the Apache Spark Developers List mailing list archive
>>  at
>> Nabble.com.
>>
>>
>
>
> --
>
> Joseph Bradley
>
> Software Engineer - Machine Learning
>
> Databricks, Inc.
>
> [image: http://databricks.com] 
>


Re: Structured Streaming Source error

2017-01-31 Thread Sam Elamin
Ha Ryan your everywhere,JIRA and maillist. I thought multitasking was a
myth!

Thanks for your help. It was using different versions!

Regards
Sam

On Tue, Jan 31, 2017 at 9:48 PM, Shixiong(Ryan) Zhu  wrote:

> You used one Spark version to compile your codes but another newer version
> to run. As the Source APIs are not stable, Spark doesn't guarantee that
> they are binary compatibility.
>
> On Tue, Jan 31, 2017 at 1:39 PM, Sam Elamin 
> wrote:
>
>> Hi Folks
>>
>>
>> I am getting a weird error when trying to write a BigQuery Structured
>> Streaming source
>>
>>
>> Error:
>> java.lang.AbstractMethodError: com.samelamin.spark.bigquery.s
>> treaming.BigQuerySource.commit(Lorg/apache/spark/sql/executi
>> on/streaming/Offset;)V
>> at org.apache.spark.sql.execution.streaming.StreamExecution$$
>> anonfun$org$apache$spark$sql$execution$streaming$
>> StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$
>> 5.apply(StreamExecution.scala:359)
>> at org.apache.spark.sql.execution.streaming.StreamExecution$$
>> anonfun$org$apache$spark$sql$execution$streaming$
>> StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$
>> 5.apply(StreamExecution.scala:358)
>>
>>
>> FYI if you are interested in Spark and BigQuery feel free to give my
>> connector a go! Still trying to get structured streaming to it as a source
>> hence this email. but you can use it as a sink!
>>
>>
>> Regards
>> Sam
>>
>>
>>
>>
>


Re: Structured Streaming Source error

2017-01-31 Thread Shixiong(Ryan) Zhu
You used one Spark version to compile your codes but another newer version
to run. As the Source APIs are not stable, Spark doesn't guarantee that
they are binary compatibility.

On Tue, Jan 31, 2017 at 1:39 PM, Sam Elamin  wrote:

> Hi Folks
>
>
> I am getting a weird error when trying to write a BigQuery Structured
> Streaming source
>
>
> Error:
> java.lang.AbstractMethodError: com.samelamin.spark.bigquery.
> streaming.BigQuerySource.commit(Lorg/apache/spark/sql/
> execution/streaming/Offset;)V
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$constructNextBatch$1$$anonfun$
> apply$mcV$sp$5.apply(StreamExecution.scala:359)
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$constructNextBatch$1$$anonfun$
> apply$mcV$sp$5.apply(StreamExecution.scala:358)
>
>
> FYI if you are interested in Spark and BigQuery feel free to give my
> connector a go! Still trying to get structured streaming to it as a source
> hence this email. but you can use it as a sink!
>
>
> Regards
> Sam
>
>
>
>


Structured Streaming Source error

2017-01-31 Thread Sam Elamin
Hi Folks


I am getting a weird error when trying to write a BigQuery Structured
Streaming source


Error:
java.lang.AbstractMethodError:
com.samelamin.spark.bigquery.streaming.BigQuerySource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358)


FYI if you are interested in Spark and BigQuery feel free to give my
connector a go! Still trying to get structured streaming to it as a source
hence this email. but you can use it as a sink!


Regards
Sam


Call for abstracts open for Dataworks & Hadoop Summit San Jose

2017-01-31 Thread Alan Gates
The Dataworks & Hadoop summit will be in San Jose June 13-15, 2017.  The call 
for abstracts closes February 10.  You can submit an abstract at 
http://tinyurl.com/dwsj17CFA

There are tracks for Hadoop, data processing and warehousing, governance and 
security, IoT and streaming, cloud and operations, and Spark and data science.  
As always the talks will be chosen by committees from the relevant communities.

Alan.
-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Unique Partition Id per partition

2017-01-31 Thread Michael Allman
Hi Sumit,

Can you use 
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=rdd#pyspark.RDD.mapPartitionsWithIndex
 

 to solve your problem?

Michael

> On Jan 31, 2017, at 9:08 AM, Chawla,Sumit  wrote:
> 
> Hi All
> 
> I have a rdd, which i partition based on some key, and then can sc.runJob for 
> each partition. 
>  Inside this function, i assign each partition a unique key using following:
> 
> "%s_%s" % (id(part), int(round(time.time()))
> This is to make sure that, each partition produces separate bookeeping stuff, 
> which can be aggregated by external system. However, I sometimes i notice 
> multiple 
> partition results pointing to same partition_id. Is this some issue due to 
> the 
> way above code is serialized by Pyspark. What's the best way to define a 
> unique id 
> for each partition. I undestand that its same executor getting multiple 
> partitions to process,
> but i would expect the above code to produce a unique id for each partition.
> 
> 
> Regards
> Sumit Chawla
> 



Re: Error Saving Dataframe to Hive with Spark 2.0.0

2017-01-31 Thread Michael Allman
That's understandable. Maybe I can help. :)

What happens if you set `HIVE_TABLE_NAME = "default.employees"`?

Also, does that table exist before you call 
`filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME)`?

Cheers,

Michael

> On Jan 29, 2017, at 9:52 PM, Chetan Khatri  
> wrote:
> 
> Okey, you are saying that 2.0.0 don't have that patch fixed ? @dev cc-- 
> I don't like everytime changing the service versions !
> 
> Thanks.
> 
> On Mon, Jan 30, 2017 at 1:10 AM, Jacek Laskowski  > wrote:
> Hi, 
> 
> I think you have to upgrade to 2.1.0. There were few changes wrt the ERROR 
> since. 
> 
> Jacek 
> 
> 
> On 29 Jan 2017 9:24 a.m., "Chetan Khatri"  > wrote:
> Hello Spark Users,
> 
> I am getting error while saving Spark Dataframe to Hive Table:
> Hive 1.2.1
> Spark 2.0.0
> Local environment.
> Note: Job is getting executed successfully and the way I want but still 
> Exception raised.
> Source Code:
> 
> package com.chetan.poc.hbase
> 
> /**
>   * Created by chetan on 24/1/17.
>   */
> import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.KeyValue.Type
> import org.apache.spark.sql.SparkSession
> import scala.collection.JavaConverters._
> import java.util.Date
> import java.text.SimpleDateFormat
> 
> 
> object IncrementalJob {
> val APP_NAME: String = "SparkHbaseJob"
> var HBASE_DB_HOST: String = null
> var HBASE_TABLE: String = null
> var HBASE_COLUMN_FAMILY: String = null
> var HIVE_DATA_WAREHOUSE: String = null
> var HIVE_TABLE_NAME: String = null
>   def main(args: Array[String]) {
> // Initializing HBASE Configuration variables
> HBASE_DB_HOST="127.0.0.1"
> HBASE_TABLE="university"
> HBASE_COLUMN_FAMILY="emp"
> // Initializing Hive Metastore configuration
> HIVE_DATA_WAREHOUSE = "/usr/local/hive/warehouse"
> // Initializing Hive table name - Target table
> HIVE_TABLE_NAME = "employees"
> // setting spark application
> // val sparkConf = new SparkConf().setAppName(APP_NAME).setMaster("local")
> //initialize the spark context
> //val sparkContext = new SparkContext(sparkConf)
> //val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
> // Enable Hive with Hive warehouse in SparkSession
> val spark = 
> SparkSession.builder().appName(APP_NAME).config("hive.metastore.warehouse.dir",
>  HIVE_DATA_WAREHOUSE).config("spark.sql.warehouse.dir", 
> HIVE_DATA_WAREHOUSE).enableHiveSupport().getOrCreate()
> import spark.implicits._
> import spark.sql
> 
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE)
> conf.set(TableInputFormat.SCAN_COLUMNS, HBASE_COLUMN_FAMILY)
> // Load an RDD of rowkey, result(ImmutableBytesWritable, Result) tuples 
> from the table
> val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, 
> classOf[TableInputFormat],
>   classOf[org.apache.hadoop.hbase.io 
> .ImmutableBytesWritable],
>   classOf[org.apache.hadoop.hbase.client.Result])
> 
> println(hBaseRDD.count())
> //hBaseRDD.foreach(println)
> 
> //keyValue is a RDD[java.util.list[hbase.KeyValue]]
> val keyValue = hBaseRDD.map(x => x._2).map(_.list)
> 
> //outPut is a RDD[String], in which each line represents a record in HBase
> val outPut = keyValue.flatMap(x =>  x.asScala.map(cell =>
> 
>   HBaseResult(
> Bytes.toInt(CellUtil.cloneRow(cell)),
> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
> cell.getTimestamp,
> new SimpleDateFormat("-MM-dd HH:mm:ss:SSS").format(new 
> Date(cell.getTimestamp.toLong)),
> Bytes.toStringBinary(CellUtil.cloneValue(cell)),
> Type.codeToType(cell.getTypeByte).toString
> )
>   )
> ).toDF()
> // Output dataframe
> outPut.show
> 
> // get timestamp
> val datetimestamp_threshold = "2016-08-25 14:27:02:001"
> val datetimestampformat = new SimpleDateFormat("-MM-dd 
> HH:mm:ss:SSS").parse(datetimestamp_threshold).getTime()
> 
> // Resultset filteration based on timestamp
> val filtered_output_timestamp = outPut.filter($"colDatetime" >= 
> datetimestampformat)
> // Resultset filteration based on rowkey
> val filtered_output_row = 
> outPut.filter($"colDatetime".between(1668493360,1668493365))
> 
> 
> // Saving Dataframe to Hive Table Successfully.
> 
> filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME)
>   }
>   case class HBaseResult(rowkey: Int, colFamily: String, colQualifier: 
> String, colDatetime: Long, colDatetimeStr: String, colValue: String, colType: 
> String)
> }
> 
> 

Unique Partition Id per partition

2017-01-31 Thread Chawla,Sumit
Hi All

I have a rdd, which i partition based on some key, and then can sc.runJob
for each partition.
 Inside this function, i assign each partition a unique key using following:

"%s_%s" % (id(part), int(round(time.time()))

This is to make sure that, each partition produces separate bookeeping stuff,

which can be aggregated by external system. However, I sometimes i
notice multiple

partition results pointing to same partition_id. Is this some issue due to the

way above code is serialized by Pyspark. What's the best way to define
a unique id

for each partition. I undestand that its same executor getting
multiple partitions to process,

but i would expect the above code to produce a unique id for each partition.



Regards
Sumit Chawla


[SQL][ML] Pipeline performance regression between 1.6 and 2.x

2017-01-31 Thread Maciej Szymkiewicz
Hi everyone,

While experimenting with ML pipelines I experience a significant
performance regression when switching from 1.6.x to 2.x.

import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
VectorAssembler}

val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
"baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
val indexers = df.columns.tail.map(c => new StringIndexer()
  .setInputCol(c)
  .setOutputCol(s"${c}_indexed")
  .setHandleInvalid("skip"))

val encoders = indexers.map(indexer => new OneHotEncoder()
  .setInputCol(indexer.getOutputCol)
  .setOutputCol(s"${indexer.getOutputCol}_encoded")
  .setDropLast(true))

val assembler = new
VectorAssembler().setInputCols(encoders.map(_.getOutputCol))
val stages: Array[PipelineStage] = indexers ++ encoders :+ assembler

new Pipeline().setStages(stages).fit(df).transform(df).show

Task execution time is comparable and executors are most of the time
idle so it looks like it is a problem with the optimizer. Is it a known
issue? Are there any changes I've missed, that could lead to this behavior?

-- 
Best,
Maciej


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Spark SQL Dataframe resulting from an except( ) is unusable

2017-01-31 Thread Vinayak Joshi5
With Spark 2.x, I construct a Dataframe from a sample libsvm file:

scala> val higgsDF = spark.read.format("libsvm").load("higgs.libsvm")
higgsDF: org.apache.spark.sql.DataFrame = [label: double, features: 
vector]


Then, build a new dataframe that involves an except( )

scala> val train_df = higgsDF.sample(false, 0.7, 42)
train_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: 
double, features: vector]

scala> val test_df = input_df.except(train_df)
test_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: 
double, features: vector]

Now, most operations on the test_df fail with this exception:

scala> test_df.show()
java.lang.RuntimeException: no default for type 
org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
  at 
org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
  at 
org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:117)
  at 
org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:110)
   .
   .

Debugging this, I see that this is the schema of this dataframe:

scala> test_df.schema
res4: org.apache.spark.sql.types.StructType = 
StructType(StructField(label,DoubleType,true), 
StructField(features,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true))

Looking a little deeper, the error occurs because the QueryPlanner ends up 
inside

  object ExtractEquiJoinKeys 
(/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala)

where it processes a LeftAnti Join. Then there is an attempt to generate a 
default Literal value for the org.apache.spark.ml.linalg.VectorUDT 
DataType which fails with the above exception. This is because there is no 
match for the VectorUDT in

def default(dataType: DataType): Literal = {..} 
(/spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/literals.scala)


Any processing on this dataframe that causes Spark to build a query plan 
(i.e. almost all productive uses of this dataframe) fails due to this 
exception. 

Is it a miss in the Literal implementation that it does not handle 
UserDefinedTypes or is it left out intentionally? Is there a way to get 
around this problem? This problem seems to be present in all 2.x version.

Regards,
Vinayak Joshi