Re: Spark vs MongoDB: saving DataFrame to db raises missing database name exception

2017-01-16 Thread Marco Mistroni
Uh. Many thanksWill try it out

On 17 Jan 2017 6:47 am, "Palash Gupta"  wrote:

> Hi Marco,
>
> What is the user and password you are using for mongodb connection? Did
> you enable authorization?
>
> Better to include user & pass in mongo url.
>
> I remember I tested with python successfully.
>
> Best Regards,
> Palash
>
>
> Sent from Yahoo Mail on Android
> 
>
> On Tue, 17 Jan, 2017 at 5:37 am, Marco Mistroni
>  wrote:
> hi all
>  i have the folllowign snippet which loads a dataframe from  a csv file
> and tries to save
> it to mongodb.
> For some reason, the MongoSpark.save method raises the following exception
>
> Exception in thread "main" java.lang.IllegalArgumentException: Missing
> database name. Set via the 'spark.mongodb.output.uri' or
> 'spark.mongodb.output.database' property
> at com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(
> MongoCompanionConfig.scala:260)
> at com.mongodb.spark.config.WriteConfig$.databaseName(
> WriteConfig.scala:36)
>
> Which is bizzarre as i m pretty sure i am setting all the necessary
> properties in the SparkConf
>
> could you kindly assist?
>
> I am running Spark 2.0.1 locally with a local mongodb instance running at
> 127.0.0.1:27017
> I am using version 2.0.0 of mongo-spark-connector
> I am running on Scala 2.11
>
> kr
>
> val spark = SparkSession
>  .builder()
>  .master("local")
>  .appName("Spark Mongo Example")
>  .getOrCreate()
> spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/
> ")
> spark.conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/
> ")
> spark.conf.set("spark.mongodb.output.database", "test")
>
> println(s"SparkPRoperties:${spark.conf.getAll}")
>
>
> val df = getDataFrame(spark) // Loading any dataframe from a file
>
> df.printSchema()
>
> println(s"Head:${df.head()}")
> println(s"Count:${df.count()}")
> println("##  SAVING TO MONGODB #")
> import com.mongodb.spark.config._
>
> import com.mongodb.spark.config._
>
> val writeConfig = WriteConfig(Map("collection" -> "spark",
> "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext)))
> MongoSpark.save(df, writeConfig)
>
>
>
>


Re: Spark vs MongoDB: saving DataFrame to db raises missing database name exception

2017-01-16 Thread Palash Gupta
Hi,
Example:
dframe = 
sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri",
 " 
mongodb://user:pass@172.26.7.192:27017/db_name.collection_name").load()dframe.printSchema()
One more thing if you create one db in mongo, please create a collection with a 
record. Otherwise mongo may not keep that db if online session die
//Palash

Sent from Yahoo Mail on Android 
 
  On Tue, 17 Jan, 2017 at 12:44 pm, Palash Gupta 
wrote:   Hi Marco,
What is the user and password you are using for mongodb connection? Did you 
enable authorization?
Better to include user & pass in mongo url.
I remember I tested with python successfully.
Best Regards,Palash

Sent from Yahoo Mail on Android 
 
  On Tue, 17 Jan, 2017 at 5:37 am, Marco Mistroni wrote:   
hi all
 i have the folllowign snippet which loads a dataframe from  a csv file and 
tries to save
it to mongodb.
For some reason, the MongoSpark.save method raises the following exception

Exception in thread "main" java.lang.IllegalArgumentException: Missing database 
name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' 
property
    at 
com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260)
    at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:36)

Which is bizzarre as i m pretty sure i am setting all the necessary properties 
in the SparkConf

could you kindly assist?

I am running Spark 2.0.1 locally with a local mongodb instance running at 
127.0.0.1:27017
I am using version 2.0.0 of mongo-spark-connector
I am running on Scala 2.11

kr

val spark = SparkSession
 .builder()
 .master("local")
 .appName("Spark Mongo Example")
 .getOrCreate() 
    spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/")
    spark.conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/")
    spark.conf.set("spark.mongodb.output.database", "test")
    
    println(s"SparkPRoperties:${spark.conf.getAll}")
    
    
    val df = getDataFrame(spark) // Loading any dataframe from a file
    
    df.printSchema()

    println(s"Head:${df.head()}")
    println(s"Count:${df.count()}")
    println("##  SAVING TO MONGODB #")
    import com.mongodb.spark.config._
    
    import com.mongodb.spark.config._

    val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" 
-> "majority"), Some(WriteConfig(spark.sparkContext)))
    MongoSpark.save(df, writeConfig)



  
  


Re: Spark vs MongoDB: saving DataFrame to db raises missing database name exception

2017-01-16 Thread Palash Gupta
Hi Marco,
What is the user and password you are using for mongodb connection? Did you 
enable authorization?
Better to include user & pass in mongo url.
I remember I tested with python successfully.
Best Regards,Palash

Sent from Yahoo Mail on Android 
 
  On Tue, 17 Jan, 2017 at 5:37 am, Marco Mistroni wrote:   
hi all
 i have the folllowign snippet which loads a dataframe from  a csv file and 
tries to save
it to mongodb.
For some reason, the MongoSpark.save method raises the following exception

Exception in thread "main" java.lang.IllegalArgumentException: Missing database 
name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' 
property
    at 
com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260)
    at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:36)

Which is bizzarre as i m pretty sure i am setting all the necessary properties 
in the SparkConf

could you kindly assist?

I am running Spark 2.0.1 locally with a local mongodb instance running at 
127.0.0.1:27017
I am using version 2.0.0 of mongo-spark-connector
I am running on Scala 2.11

kr

val spark = SparkSession
 .builder()
 .master("local")
 .appName("Spark Mongo Example")
 .getOrCreate() 
    spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/")
    spark.conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/")
    spark.conf.set("spark.mongodb.output.database", "test")
    
    println(s"SparkPRoperties:${spark.conf.getAll}")
    
    
    val df = getDataFrame(spark) // Loading any dataframe from a file
    
    df.printSchema()

    println(s"Head:${df.head()}")
    println(s"Count:${df.count()}")
    println("##  SAVING TO MONGODB #")
    import com.mongodb.spark.config._
    
    import com.mongodb.spark.config._

    val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" 
-> "majority"), Some(WriteConfig(spark.sparkContext)))
    MongoSpark.save(df, writeConfig)



  


GroupBy and Spark Performance issue

2017-01-16 Thread KhajaAsmath Mohammed
Hi,

I am trying to group by data in spark and find out maximum value for group
of data. I have to use group by as I need to transpose based on the values.

I tried repartition data by increasing number from 1 to 1.Job gets run
till the below stage and it takes long time to move ahead. I was never
successful, job gets killed after somtime with GC overhead limit issues.


[image: Inline image 1]

Increased Memory limits too. Not sure what is going wrong, can anyone guide
me through right approach.

Thanks,
Asmath


Weird experience Hive with Spark Transformations

2017-01-16 Thread Chetan Khatri
Hello,

I have following services are configured and installed successfully:

Hadoop 2.7.x
Spark 2.0.x
HBase 1.2.4
Hive 1.2.1

*Installation Directories:*

/usr/local/hadoop
/usr/local/spark
/usr/local/hbase

*Hive Environment variables:*

#HIVE VARIABLES START
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin
#HIVE VARIABLES END

So, I can access Hive from anywhere as environment variables are
configured. Now if if i start my spark-shell & hive from location
/usr/local/hive then both work good for hive-metastore other wise from
where i start spark-shell where spark creates own meta-store.

i.e I am reading from HBase and Writing to Hive using Spark. I dont know
why this is weird issue is.




Thanks.


Re: About saving DataFrame to Hive 1.2.1 with Spark 2.0.1

2017-01-16 Thread Chetan Khatri
Hello Spark Folks,

Other weird experience i have with Spark with SqlContext is when i created
Dataframe sometime this error throws exception and sometime not !

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> val stdDf = sqlContext.createDataFrame(rowRDD,empSchema.struct);
17/01/17 10:27:15 ERROR metastore.RetryingHMSHandler:
AlreadyExistsException(message:Database default already exists)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
at com.sun.proxy.$Proxy21.create_database(Unknown Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:644)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)
at com.sun.proxy.$Proxy22.createDatabase(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:306)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply$mcV$sp(HiveClientImpl.scala:309)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:280)
at
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
at
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
at
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:269)
at
org.apache.spark.sql.hive.client.HiveClientImpl.createDatabase(HiveClientImpl.scala:308)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply$mcV$sp(HiveExternalCatalog.scala:99)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
at
org.apache.spark.sql.hive.HiveExternalCatalog.createDatabase(HiveExternalCatalog.scala:98)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:147)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.(SessionCatalog.scala:89)
at
org.apache.spark.sql.hive.HiveSessionCatalog.(HiveSessionCatalog.scala:51)
at
org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:49)
at
org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48)
at
org.apache.spark.sql.hive.HiveSessionState$$anon$1.(HiveSessionState.scala:63)
at
org.apache.spark.sql.hive.HiveSessionState.analyzer$lzycompute(HiveSessionState.scala:63)
at
org.apache.spark.sql.hive.HiveSessionState.analyzer(HiveSessionState.scala:62)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:542)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:302)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:337)
at
$line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:43)
at
$line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:48)
at
$line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:50)
at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:52)
at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:54)
at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:56)
at $line28.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:58)
at $line28.$read$$iw$$iw$$iw$$iw$$iw.(:60)
at $line28.$read$$iw$$iw$$iw$$iw.(:62)
at $line28.$read$$iw$$iw$$iw.(:64)
at $line28.$read$$iw$$iw.(:66)
at $line28.$read$$iw.(:68)
at $line28.$read.(:70)
at $line28.$read$.(:74)
at $line28.$read$.()
at $line28.$eval$.$print$lzycompute(:7)
at $line28.$eval$.$print(:6)
at $line28.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)

ScalaReflectionException (class not found) error for user class in spark 2.1.0

2017-01-16 Thread Koert Kuipers
i am experiencing a ScalaReflectionException exception when doing an
aggregation on a spark-sql DataFrame. the error looks like this:

Exception in thread "main" scala.ScalaReflectionException: class 
in JavaMirror with sun.misc.Launcher$AppClassLoader@28d93b30 of type class
sun.misc.Launcher$AppClassLoader with classpath [] not found.
at
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
at
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
at 
at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
at
org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:127)
at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49)
at
org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:127)
at 


some things to note:
*  contains driver-class-path as indicated by me using
spark-submit, and all the jars that spark added. but it does not contain my
own assembly jar which contains 
* the class that is missing is a simple case class that is only used in the
aggregators on the executors, never driver-side
* i am running spark 2.1.0 with java 8 on yarn, but i can reproduce the
same error in local mode

what is this classloader that excludes my jar?
the error looks somewhat like SPARK-8470, but i am not using hive, and
spark was not build with hive support.

i can fix the error by adding my assembly jar to driver-classpath, but that
feels like a hack.

thanks,
koert


Re: partition size inherited from parent: auto coalesce

2017-01-16 Thread Takeshi Yamamuro
Hi,

The coalesce does not automatically happen now and you need to control the
number for yourself.
Basically, #partitions respect a `spark.default.parallelism`  number, by
default, #cores for your computer.
http://spark.apache.org/docs/latest/configuration.html#execution-behavior

// maropu

On Tue, Jan 17, 2017 at 11:58 AM, Suzen, Mehmet  wrote:

> Hello List,
>
>  I was wondering what is the design principle that partition size of
> an RDD is inherited from the parent.  See one simple example below
> [*]. 'ngauss_rdd2' has significantly less data, intuitively in such
> cases, shouldn't spark invoke coalesce automatically for performance?
> What would be the configuration option for this if there is any?
>
> Best,
> -m
>
> [*]
> // Generate 1 million Gaussian random numbers
> import util.Random
> Random.setSeed(4242)
> val ngauss = (1 to 1e6.toInt).map(x=>Random.nextGaussian)
> val ngauss_rdd = sc.parallelize(ngauss)
> ngauss_rdd.count // 1 million
> ngauss_rdd.partitions.size // 4
> val ngauss_rdd2 = ngauss_rdd.filter(x=>x > 4.0)
> ngauss_rdd2.count // 35
> ngauss_rdd2.partitions.size // 4
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Apache Spark example split/merge shards

2017-01-16 Thread Takeshi Yamamuro
Hi,

It seems you hit this issue:
https://issues.apache.org/jira/browse/SPARK-18020

// maropu

On Tue, Jan 17, 2017 at 11:51 AM, noppanit  wrote:

> I'm totally new to Spark and I'm trying to learn from the example. I'm
> following this example
>
> https://github.com/apache/spark/blob/master/external/
> kinesis-asl/src/main/scala/org/apache/spark/examples/
> streaming/KinesisWordCountASL.scala.
>
> It works well. But I do have one question. Every time I split/merge Kinesis
> shards from the Amazon API, I have to restart the application. Is there a
> way for Spark application to automatically rebalance or to notify the
> application that Kinesis shards have been split or merged? I have to
> restart
> the application so I can see more shards in DynamoDB.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Apache-Spark-example-split-merge-shards-tp28311.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


partition size inherited from parent: auto coalesce

2017-01-16 Thread Suzen, Mehmet
Hello List,

 I was wondering what is the design principle that partition size of
an RDD is inherited from the parent.  See one simple example below
[*]. 'ngauss_rdd2' has significantly less data, intuitively in such
cases, shouldn't spark invoke coalesce automatically for performance?
What would be the configuration option for this if there is any?

Best,
-m

[*]
// Generate 1 million Gaussian random numbers
import util.Random
Random.setSeed(4242)
val ngauss = (1 to 1e6.toInt).map(x=>Random.nextGaussian)
val ngauss_rdd = sc.parallelize(ngauss)
ngauss_rdd.count // 1 million
ngauss_rdd.partitions.size // 4
val ngauss_rdd2 = ngauss_rdd.filter(x=>x > 4.0)
ngauss_rdd2.count // 35
ngauss_rdd2.partitions.size // 4

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Apache Spark example split/merge shards

2017-01-16 Thread noppanit
I'm totally new to Spark and I'm trying to learn from the example. I'm
following this example 

https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala.

It works well. But I do have one question. Every time I split/merge Kinesis
shards from the Amazon API, I have to restart the application. Is there a
way for Spark application to automatically rebalance or to notify the
application that Kinesis shards have been split or merged? I have to restart
the application so I can see more shards in DynamoDB. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-example-split-merge-shards-tp28311.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



how to use newAPIHadoopFile

2017-01-16 Thread lk_spark
hi,all
I have a test with spark 2.0:

I have a test file: field delimiter with \t
kevin 30 2016
shen 30 2016
kai 33 2016
wei 30 2016
after useing:
var datas: RDD[(LongWritable, String)] = 
sc.newAPIHadoopFile(inputPath+filename, classOf[TextInputFormat], 
classOf[LongWritable], classOf[Text], hadoopConf).map { case (key, value) =>
(key, new String(value.getBytes, decode))
}
and I save RDD to hdfs I got this:
(0,kevin 30 2016)
(14,shen 30 20166)
(27,kai 33 201666)
(39,wei 30 201666)
It looks like after the reader read a line and it did't clean it's buffer or 
something?

2017-01-17


lk_spark 

Re: filter rows by all columns

2017-01-16 Thread Hyukjin Kwon
Hi Shawn,

Could we do this as below?

 for any of true

scala> val df = spark.range(10).selectExpr("id as a", "id / 2 as b")
df: org.apache.spark.sql.DataFrame = [a: bigint, b: double]

scala> df.filter(_.toSeq.exists(v => v == 1)).show()
+---+---+
|  a|  b|
+---+---+
|  1|0.5|
|  2|1.0|
+---+---+

​

or for all of true

scala> val df = spark.range(10).selectExpr("id as a", "id / 2 as b")
df: org.apache.spark.sql.DataFrame = [a: bigint, b: double]

scala> df.filter(_.toSeq.forall(v => v == 0)).show()
+---+---+
|  a|  b|
+---+---+
|  0|0.0|
+---+---+

​





2017-01-17 7:27 GMT+09:00 Shawn Wan :

> I need to filter out outliers from a dataframe by all columns. I can
> manually list all columns like:
>
> df.filter(x=>math.abs(x.get(0).toString().toDouble-means(0))<=3*stddevs(0
> ))
>
> .filter(x=>math.abs(x.get(1).toString().toDouble-means(1))<=3*stddevs(
> 1))
>
> ...
>
> But I want to turn it into a general function which can handle variable
> number of columns. How could I do that? Thanks in advance!
>
>
> Regards,
>
> Shawn
>
> --
> View this message in context: filter rows by all columns
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


partition size inherited from parent: auto coalesce

2017-01-16 Thread Suzen, Mehmet
Hello List,

 I was wondering what is the design principle that partition size of
an RDD is inherited from the parent.  See one simple example below
[*]. 'ngauss_rdd2' has significantly less data, intuitively in such
cases, shouldn't spark invoke coalesce automatically for performance?
What would be the configuration option for this if there is any?

Best,
-m

[*]
// Generate 1 million Gaussian random numbers
import util.Random
Random.setSeed(4242)
val ngauss = (1 to 1e6.toInt).map(x=>Random.nextGaussian)
val ngauss_rdd = sc.parallelize(ngauss)
ngauss_rdd.count // 1 million
ngauss_rdd.partitions.size // 4
val ngauss_rdd2 = ngauss_rdd.filter(x=>x > 4.0)
ngauss_rdd2.count // 35
ngauss_rdd2.partitions.size // 4

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Metadata is not propagating with Dataset.map()

2017-01-16 Thread Matthew Tovbin
Hello,

It seems that metadata is not propagating when using Dataset.map(). Is
there a workaround?

Below are the steps to reproduce:

import spark.implicits._
val columnName = "col1"
val meta = new MetadataBuilder().putString("foo", "bar").build()
val schema = StructType(Array(StructField(columnName, DoubleType, true,
metadata = meta)))
def printSchema(d: Dataset[_]) = {
d.printSchema()
d.schema.fields.foreach(field => println("metadata for '" + field.name +
"': " + field.metadata.json))
}
val rows = spark.sparkContext.parallelize(Seq(1.0, 5.0, 3.0, 2.0, 6.0,
null).map(Row(_)))
val df = spark.createDataFrame(rows, schema)
printSchema(df) // metadata printed correctly
printSchema(df.select(columnName)) // metadata printed correctly
printSchema(df.map(r => r.getDouble(0))) // metadata is missing


Thank you,


-Matthew


Spark vs MongoDB: saving DataFrame to db raises missing database name exception

2017-01-16 Thread Marco Mistroni
hi all
 i have the folllowign snippet which loads a dataframe from  a csv file and
tries to save
it to mongodb.
For some reason, the MongoSpark.save method raises the following exception

Exception in thread "main" java.lang.IllegalArgumentException: Missing
database name. Set via the 'spark.mongodb.output.uri' or
'spark.mongodb.output.database' property
at
com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260)
at
com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:36)

Which is bizzarre as i m pretty sure i am setting all the necessary
properties in the SparkConf

could you kindly assist?

I am running Spark 2.0.1 locally with a local mongodb instance running at
127.0.0.1:27017
I am using version 2.0.0 of mongo-spark-connector
I am running on Scala 2.11

kr

val spark = SparkSession
 .builder()
 .master("local")
 .appName("Spark Mongo Example")
 .getOrCreate()
spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/")
spark.conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/")
spark.conf.set("spark.mongodb.output.database", "test")

println(s"SparkPRoperties:${spark.conf.getAll}")


val df = getDataFrame(spark) // Loading any dataframe from a file

df.printSchema()

println(s"Head:${df.head()}")
println(s"Count:${df.count()}")
println("##  SAVING TO MONGODB #")
import com.mongodb.spark.config._

import com.mongodb.spark.config._

val writeConfig = WriteConfig(Map("collection" -> "spark",
"writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext)))
MongoSpark.save(df, writeConfig)


Metadata is not propagating with Dataset.map()

2017-01-16 Thread tovbinm
Hello,

It seems that metadata is not propagating when using Dataset.map(). Is there
a workaround?

Below are the steps to reproduce:

import spark.implicits._
val columnName = "col1"
val meta = new MetadataBuilder().putString("foo", "bar").build()
val schema = StructType(Array(StructField(columnName, DoubleType, true,
metadata = meta)))
def printSchema(d: Dataset[_]) = {
d.printSchema()
d.schema.fields.foreach(field => println("metadata for '" + field.name +
"': " + field.metadata.json))
}
val rows = spark.sparkContext.parallelize(Seq(1.0, 5.0, 3.0, 2.0, 6.0,
null).map(Row(_)))
val df = spark.createDataFrame(rows, schema)
printSchema(df) // metadata printed correctly
printSchema(df.select(columnName)) // metadata printed correctly
printSchema(df.map(r => r.getDouble(0))) // metadata is missing


Thank you,

-Matthew



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Metadata-is-not-propagating-with-Dataset-map-tp28310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



mapPartition iterator

2017-01-16 Thread AnilKumar B
Hi

For my use case, I need to call a third party function(which is in memory
based) for each complete partition data. So I am partitioning RDD logically
using repartition on index column and applying function f  on
mapPartitions(f).

When, I iterate through mapPartition iterator. Can, I assume one task will
only processes one particular partition's complete data(assuming this is
small in size)?

Or to achieve this, do I need to use glom() on repartition? instead of
mapPartitions?

And when exactly, I should use preservesPartitioning=true on mapPartitions?

Thanks & Regards,
B Anil Kumar.


filter rows by all columns

2017-01-16 Thread Shawn Wan
I need to filter out outliers from a dataframe by all columns. I can
manually list all columns like:

df.filter(x=>math.abs(x.get(0).toString().toDouble-means(0))<=3*stddevs(0))

.filter(x=>math.abs(x.get(1).toString().toDouble-means(1))<=3*stddevs(1
))

...

But I want to turn it into a general function which can handle variable
number of columns. How could I do that? Thanks in advance!


Regards,

Shawn




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/filter-rows-by-all-columns-tp28309.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark 2.0 vs MongoDb /Cannot find dependency using sbt

2017-01-16 Thread Marco Mistroni
sorry. should have done more research before jumping to the list
the version of the connector is 2.0.0, available from maven repors

sorry

On Mon, Jan 16, 2017 at 9:32 PM, Marco Mistroni  wrote:

> HI all
> in searching on how to use Spark 2.0 with mongo i came across this link
>
> https://jira.mongodb.org/browse/SPARK-20
>
> i amended my build.sbt (content below), however the mongodb dependency was
> not found
> Could anyone assist?
>
> kr
>  marco
>
> name := "SparkExamples"
> version := "1.0"
> scalaVersion := "2.11.8"
> val sparkVersion = "2.0.1"
>
> // Add a single dependency
> libraryDependencies += "junit" % "junit" % "4.8" % "test"
> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
> "org.slf4j" % "slf4j-simple" % "1.7.5",
> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
> libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
> sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink" %
> sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
> libraryDependencies += "org.mongodb.spark" % "mongo-spark-connector_2.10"
> % "2.0.0-SNAPSHOT"
>
> resolvers += "MavenRepository" at "https://mvnrepository.com/;
>
>


Spark 2.0 vs MongoDb /Cannot find dependency using sbt

2017-01-16 Thread Marco Mistroni
HI all
in searching on how to use Spark 2.0 with mongo i came across this link

https://jira.mongodb.org/browse/SPARK-20

i amended my build.sbt (content below), however the mongodb dependency was
not found
Could anyone assist?

kr
 marco

name := "SparkExamples"
version := "1.0"
scalaVersion := "2.11.8"
val sparkVersion = "2.0.1"

// Add a single dependency
libraryDependencies += "junit" % "junit" % "4.8" % "test"
libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
"org.slf4j" % "slf4j-simple" % "1.7.5",
"org.clapper" %% "grizzled-slf4j" % "1.0.2")
libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink" %
sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
libraryDependencies += "org.mongodb.spark" % "mongo-spark-connector_2.10" %
"2.0.0-SNAPSHOT"

resolvers += "MavenRepository" at "https://mvnrepository.com/;


Re: Running Spark on EMR

2017-01-16 Thread Everett Anderson
On Sun, Jan 15, 2017 at 11:09 AM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> use yarn :)
>
> "spark-submit --master yarn"
>

Doesn't this require first copying out various Hadoop configuration XML
files from the EMR master node to the machine running the spark-submit? Or
is there a well-known minimal set of host/port options to avoid that?

I'm currently copying out several XML files and using them on a client
running spark-submit, but I feel uneasy about this as it seems like the
local values override values on the cluster at runtime -- they're copied up
with the job.




>
>
> On Sun, Jan 15, 2017 at 7:55 PM, Darren Govoni 
> wrote:
>
>> So what was the answer?
>>
>>
>>
>> Sent from my Verizon, Samsung Galaxy smartphone
>>
>>  Original message 
>> From: Andrew Holway 
>> Date: 1/15/17 11:37 AM (GMT-05:00)
>> To: Marco Mistroni 
>> Cc: Neil Jonkers , User 
>> Subject: Re: Running Spark on EMR
>>
>> Darn. I didn't respond to the list. Sorry.
>>
>>
>>
>> On Sun, Jan 15, 2017 at 5:29 PM, Marco Mistroni 
>> wrote:
>>
>>> thanks Neil. I followed original suggestion from Andrw and everything is
>>> working fine now
>>> kr
>>>
>>> On Sun, Jan 15, 2017 at 4:27 PM, Neil Jonkers 
>>> wrote:
>>>
 Hello,

 Can you drop the url:

  spark://master:7077

 The url is used when running Spark in standalone mode.

 Regards


  Original message 
 From: Marco Mistroni
 Date:15/01/2017 16:34 (GMT+02:00)
 To: User
 Subject: Running Spark on EMR

 hi all
  could anyone assist here?
 i am trying to run spark 2.0.0 on an EMR cluster,but i am having issues
 connecting to the master node
 So, below is a snippet of what i am doing


 sc = SparkSession.builder.master(sparkHost).appName("DataProcess"
 ).getOrCreate()

 sparkHost is passed as input parameter. that was thought so that i can
 run the script locally
 on my spark local instance as well as submitting scripts on any cluster
 i want


 Now i have
 1 - setup a cluster on EMR.
 2 - connected to masternode
 3  - launch the command spark-submit myscripts.py spark://master:7077

 But that results in an connection refused exception
 Then i have tried to remove the .master call above and launch the
 script with the following command

 spark-submit --master spark://master:7077   myscript.py  but still i
 am getting
 connectionREfused exception


 I am using Spark 2.0.0 , could anyone advise on how shall i build the
 spark session and how can i submit a pythjon script to the cluster?

 kr
  marco

>>>
>>>
>>
>>
>> --
>> Otter Networks UG
>> http://otternetworks.de
>> Gotenstraße 17
>> 10829 Berlin
>>
>
>
>
> --
> Otter Networks UG
> http://otternetworks.de
> Gotenstraße 17
> 10829 Berlin
>


Re: Equally split a RDD partition into two partition at the same node

2017-01-16 Thread Fei Hu
Hi Pradeep,

That is a good idea. My customized RDDs are similar to the NewHadoopRDD. If
we have billions of InputSplit, will it be bottlenecked for the
performance? That is, will too many data need to be transferred from master
node to computing nodes by networking?

Thanks,
Fei

On Mon, Jan 16, 2017 at 2:07 PM, Pradeep Gollakota 
wrote:

> Usually this kind of thing can be done at a lower level in the InputFormat
> usually by specifying the max split size. Have you looked into that
> possibility with your InputFormat?
>
> On Sun, Jan 15, 2017 at 9:42 PM, Fei Hu  wrote:
>
>> Hi Jasbir,
>>
>> Yes, you are right. Do you have any idea about my question?
>>
>> Thanks,
>> Fei
>>
>> On Mon, Jan 16, 2017 at 12:37 AM,  wrote:
>>
>>> Hi,
>>>
>>>
>>>
>>> Coalesce is used to decrease the number of partitions. If you give the
>>> value of numPartitions greater than the current partition, I don’t think
>>> RDD number of partitions will be increased.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Jasbir
>>>
>>>
>>>
>>> *From:* Fei Hu [mailto:hufe...@gmail.com]
>>> *Sent:* Sunday, January 15, 2017 10:10 PM
>>> *To:* zouz...@cs.toronto.edu
>>> *Cc:* user @spark ; d...@spark.apache.org
>>> *Subject:* Re: Equally split a RDD partition into two partition at the
>>> same node
>>>
>>>
>>>
>>> Hi Anastasios,
>>>
>>>
>>>
>>> Thanks for your reply. If I just increase the numPartitions to be twice
>>> larger, how coalesce(numPartitions: Int, shuffle: Boolean = false)
>>> keeps the data locality? Do I need to define my own Partitioner?
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Fei
>>>
>>>
>>>
>>> On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias 
>>> wrote:
>>>
>>> Hi Fei,
>>>
>>>
>>>
>>> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?
>>>
>>>
>>>
>>> https://github.com/apache/spark/blob/branch-1.6/core/src/mai
>>> n/scala/org/apache/spark/rdd/RDD.scala#L395
>>> 
>>>
>>>
>>>
>>> coalesce is mostly used for reducing the number of partitions before
>>> writing to HDFS, but it might still be a narrow dependency (satisfying your
>>> requirements) if you increase the # of partitions.
>>>
>>>
>>>
>>> Best,
>>>
>>> Anastasios
>>>
>>>
>>>
>>> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu  wrote:
>>>
>>> Dear all,
>>>
>>>
>>>
>>> I want to equally divide a RDD partition into two partitions. That
>>> means, the first half of elements in the partition will create a new
>>> partition, and the second half of elements in the partition will generate
>>> another new partition. But the two new partitions are required to be at the
>>> same node with their parent partition, which can help get high data
>>> locality.
>>>
>>>
>>>
>>> Is there anyone who knows how to implement it or any hints for it?
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>> Fei
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> -- Anastasios Zouzias
>>>
>>>
>>>
>>> --
>>>
>>> This message is for the designated recipient only and may contain
>>> privileged, proprietary, or otherwise confidential information. If you have
>>> received it in error, please notify the sender immediately and delete the
>>> original. Any other use of the e-mail by you is prohibited. Where allowed
>>> by local law, electronic communications with Accenture and its affiliates,
>>> including e-mail and instant messaging (including content), may be scanned
>>> by our systems for the purposes of information security and assessment of
>>> internal compliance with Accenture policy.
>>> 
>>> __
>>>
>>> www.accenture.com
>>>
>>
>>
>


Re: groupByKey vs mapPartitions for efficient grouping within a Partition

2017-01-16 Thread Andy Dang
groupByKey() is a wide dependency and will cause a full shuffle. It's
advised against using this transformation unless you keys are balanced
(well-distributed) and you need a full shuffle.

Otherwise, what you want is aggregateByKey() or reduceByKey() (depending on
the output). These actions are backed by comineByKey(), which can perform
map-side aggregation.

---
Regards,
Andy

On Mon, Jan 16, 2017 at 2:21 PM, Patrick  wrote:

> Hi,
>
> Does groupByKey has intelligence associated with it, such that if all the
> keys resides in the same partition, it should not do the shuffle?
>
> Or user should write mapPartitions( scala groupBy code).
>
> Which would be more efficient and what are the memory considerations?
>
>
> Thanks
>
>
>
>


Re: Spark streaming app that processes Kafka DStreams produces no output and no error

2017-01-16 Thread shyla deshpande
Hello,
I checked the log file on the worker node and don't see any error there.
This is the first time I am asked to run on such a small cluster.  I feel
its the resources issue, but it will be great help is somebody can confirm
this or share your experience. Thanks

On Sat, Jan 14, 2017 at 4:01 PM, shyla deshpande 
wrote:

> Hello,
>
> I want to add that,
> I don't even see the streaming tab in the application UI on port 4040 when
> I run it on the cluster.
> The cluster on EC2  has 1 master node and 1 worker node.
> The cores used on the worker node is 2 of 2 and memory used is 6GB of
> 6.3GB.
>
> Can I run a spark streaming job with just 2 cores?
>
> Appreciate your time and help.
>
> Thanks
>
>
>
>
>
> On Fri, Jan 13, 2017 at 10:46 PM, shyla deshpande <
> deshpandesh...@gmail.com> wrote:
>
>> Hello,
>>
>> My spark streaming app that reads kafka topics and prints the DStream
>> works fine on my laptop, but on AWS cluster it produces no output and no
>> errors.
>>
>> Please help me debug.
>>
>> I am using Spark 2.0.2 and kafka-0-10
>>
>> Thanks
>>
>> The following is the output of the spark streaming app...
>>
>>
>> 17/01/14 06:22:41 WARN NativeCodeLoader: Unable to load native-hadoop 
>> library for your platform... using builtin-java classes where applicable
>> 17/01/14 06:22:43 WARN Checkpoint: Checkpoint directory check1 does not exist
>> Creating new context
>> 17/01/14 06:22:45 WARN SparkContext: Use an existing SparkContext, some 
>> configuration may not take effect.
>> 17/01/14 06:22:45 WARN KafkaUtils: overriding enable.auto.commit to false 
>> for executor
>> 17/01/14 06:22:45 WARN KafkaUtils: overriding auto.offset.reset to none for 
>> executor
>> 17/01/14 06:22:45 WARN KafkaUtils: overriding executor group.id to 
>> spark-executor-whilDataStream
>> 17/01/14 06:22:45 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 
>> see KAFKA-3135
>>
>>
>>
>


Re: Old version of Spark [v1.2.0]

2017-01-16 Thread Debasish Das
You may want to pull up release/1.2 branch and 1.2.0 tag to build it
yourself incase the packages are not available.
On Jan 15, 2017 2:55 PM, "Md. Rezaul Karim" 
wrote:

> Hi Ayan,
>
> Thanks a million.
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>
> On 15 January 2017 at 22:48, ayan guha  wrote:
>
>> archive.apache.org will always have all the releases:
>> http://archive.apache.org/dist/spark/
>>
>> @Spark users: it may be a good idea to have a "To download older
>> versions, click here" link to Spark Download page?
>>
>> On Mon, Jan 16, 2017 at 8:16 AM, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Hi,
>>>
>>> I am looking for Spark 1.2.0 version. I tried to download in the Spark
>>> website but it's no longer available.
>>>
>>> Any suggestion?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim*, BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> 
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Importing a github project on sbt

2017-01-16 Thread Marco Mistroni
UhmNot a SPK issueAnyway...Had similar issues with sbt
The quick sol. To get u going is to place ur dependency in your lib folder
The notsoquick is to build the sbt dependency and do a sbt publish-local,
or deploy local
But I consider both approaches hacks.
Hth

On 16 Jan 2017 2:00 pm, "marcos rebelo"  wrote:

Hi all,

I have this project:
https://github.com/oleber/aws-stepfunctions


I have a second project that should import the first one. On the second
project I did something like:

lazy val awsStepFunctions = RootProject(uri("git://github.
com/oleber/aws-stepfunctions.git#31990fce907cbda3814954c390dcbc1b7807b2d5"))

lazy val importerWithStepFunction = project.in(file("modules/
importerWithStepFunction"))
  .settings(global: _*)
  .dependsOn(
awsStepFunctions % allScopes,
commonCommons % allScopes,
home24AWS % allScopes,
importerBing % allScopes
  )


and I get an error like:

[warn] ::
[warn] ::  UNRESOLVED DEPENDENCIES ::
[warn] ::
[warn] :: default#aws-stepfunctions_2.11;1.0: not found
[warn] ::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] default:aws-stepfunctions_2.11:1.0
[warn]   +- de.home24:importerwithstepfunction_2.11:0.1-SNAPSHOT


Clearly I'm missing something. Can you direct me to the solution or to
documentation? I will work something.

Best Regards
Marcos Rebelo


groupByKey vs mapPartitions for efficient grouping within a Partition

2017-01-16 Thread Patrick
Hi,

Does groupByKey has intelligence associated with it, such that if all the
keys resides in the same partition, it should not do the shuffle?

Or user should write mapPartitions( scala groupBy code).

Which would be more efficient and what are the memory considerations?


Thanks


Importing a github project on sbt

2017-01-16 Thread marcos rebelo
Hi all,

I have this project:
https://github.com/oleber/aws-stepfunctions


I have a second project that should import the first one. On the second
project I did something like:

lazy val awsStepFunctions = RootProject(uri("git://
github.com/oleber/aws-stepfunctions.git#31990fce907cbda3814954c390dcbc1b7807b2d5
"))

lazy val importerWithStepFunction = project.in
(file("modules/importerWithStepFunction"))
  .settings(global: _*)
  .dependsOn(
awsStepFunctions % allScopes,
commonCommons % allScopes,
home24AWS % allScopes,
importerBing % allScopes
  )


and I get an error like:

[warn] ::
[warn] ::  UNRESOLVED DEPENDENCIES ::
[warn] ::
[warn] :: default#aws-stepfunctions_2.11;1.0: not found
[warn] ::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] default:aws-stepfunctions_2.11:1.0
[warn]   +- de.home24:importerwithstepfunction_2.11:0.1-SNAPSHOT


Clearly I'm missing something. Can you direct me to the solution or to
documentation? I will work something.

Best Regards
Marcos Rebelo


Re: Flume and Spark Streaming

2017-01-16 Thread Guillermo Ortiz
Avro sink --> Spark Streaming

2017-01-16 13:55 GMT+01:00 ayan guha :

> With Flume, what would be your sink?
>
>
>
> On Mon, Jan 16, 2017 at 10:44 PM, Guillermo Ortiz 
> wrote:
>
>> I'm wondering to use Flume (channel file)-Spark Streaming.
>>
>> I have some doubts about it:
>>
>> 1.The RDD size is all data what it comes in a microbatch which you have
>> defined. Risght?
>>
>> 2.If there are 2Gb of data, how many are RDDs generated? just one and I
>> have to make a repartition?
>>
>> 3.When is the ACK sent back  from Spark to Flume?
>>   I guess that if Flume dies, Flume is going to send the same data again
>> to Spark
>>   If Spark dies, I have no idea if Spark is going to reprocessing same
>> data again when it is sent again.
>>   Coult it be different if I use Kafka Channel?
>>
>>
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Flume and Spark Streaming

2017-01-16 Thread ayan guha
With Flume, what would be your sink?



On Mon, Jan 16, 2017 at 10:44 PM, Guillermo Ortiz 
wrote:

> I'm wondering to use Flume (channel file)-Spark Streaming.
>
> I have some doubts about it:
>
> 1.The RDD size is all data what it comes in a microbatch which you have
> defined. Risght?
>
> 2.If there are 2Gb of data, how many are RDDs generated? just one and I
> have to make a repartition?
>
> 3.When is the ACK sent back  from Spark to Flume?
>   I guess that if Flume dies, Flume is going to send the same data again
> to Spark
>   If Spark dies, I have no idea if Spark is going to reprocessing same
> data again when it is sent again.
>   Coult it be different if I use Kafka Channel?
>
>
>
>
>


-- 
Best Regards,
Ayan Guha


Flume and Spark Streaming

2017-01-16 Thread Guillermo Ortiz
I'm wondering to use Flume (channel file)-Spark Streaming.

I have some doubts about it:

1.The RDD size is all data what it comes in a microbatch which you have
defined. Risght?

2.If there are 2Gb of data, how many are RDDs generated? just one and I
have to make a repartition?

3.When is the ACK sent back  from Spark to Flume?
  I guess that if Flume dies, Flume is going to send the same data again to
Spark
  If Spark dies, I have no idea if Spark is going to reprocessing same data
again when it is sent again.
  Coult it be different if I use Kafka Channel?


Re: ML PIC

2017-01-16 Thread Nick Pentreath
The JIRA for this is here: https://issues.apache.org/jira/browse/SPARK-15784

There is a PR open already for it, which still needs to be reviewed.



On Wed, 21 Dec 2016 at 18:01 Robert Hamilton 
wrote:

> Thank you Nick that is good to know.
>
> Would this have some opportunity for newbs (like me) to volunteer some
> time?
>
> Sent from my iPhone
>
> On Dec 21, 2016, at 9:08 AM, Nick Pentreath 
> wrote:
>
> It is part of the general feature parity roadmap. I can't recall offhand
> any blocker reasons it's just resources
> On Wed, 21 Dec 2016 at 17:05, Robert Hamilton <
> robert_b_hamil...@icloud.com> wrote:
>
> Hi all.  Is it on the roadmap to have an
> Spark.ml.clustering.PowerIterationClustering? Are there technical reasons
> that there is currently only an .mllib version?
>
>
> Sent from my iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Old version of Spark [v1.2.0]

2017-01-16 Thread Jacek Laskowski
Hi Ayan,

Although my first reaction was "Why would anyone ever want to download
older versions" after a brief thinking about it I concluded that there
might be a merit having it.

Could you please file an issue in the issue tracker ->
https://issues.apache.org/jira/browse/SPARK?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jan 15, 2017 at 11:48 PM, ayan guha  wrote:
> archive.apache.org will always have all the releases:
> http://archive.apache.org/dist/spark/
>
> @Spark users: it may be a good idea to have a "To download older versions,
> click here" link to Spark Download page?
>
> On Mon, Jan 16, 2017 at 8:16 AM, Md. Rezaul Karim
>  wrote:
>>
>> Hi,
>>
>> I am looking for Spark 1.2.0 version. I tried to download in the Spark
>> website but it's no longer available.
>>
>> Any suggestion?
>>
>>
>>
>>
>>
>>
>> Regards,
>> _
>> Md. Rezaul Karim, BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>
>
>
>
> --
> Best Regards,
> Ayan Guha

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark DataFrames/Streaming]: Bad performance with window function in streaming job

2017-01-16 Thread Julian Keppel
Hi,

I use Spark 2.0.2 and want to do the following:

I extract features in a streaming job and than apply the records to a
k-means model. Some of the features are simple ones which are calculated
directly from the record. But I also have more complex features which
depend on records from a specified time window before. They count how many
connections in the last second were to the same host or service as the
current one. I decided to use the SQL window functions for this.

So I build window specifications:

val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(
desc("timestamp")).rangeBetween(-1L, 0L)
val serviceCountWindow = Window.partitionBy("service").
orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

And a function which is called to extract this features on every batch:

def extractTrafficFeatures(dataset: Dataset[Row]) = {
  dataset
.withColumn("host_count", count(dataset("plainrecord.ip_
dst")).over(hostCountWindow))
.withColumn("srv_count", count(dataset("service")).
over(serviceCountWindow))
}

And use this function as follows

stream.map(...).map(...).foreachRDD { rdd =>
  val dataframe = rdd.toDF(featureHeaders: _*).transform(
extractTrafficFeatures(_))
  ...
}

The problem is that this has a very bad performance. A batch needs between
1 and 3 seconds for a average input rate of less than 100 records per
second. I guess it comes from the partitioning, which produces a lot of
shuffling? Is there a better way to calculate these features on the
streaming data? Or am I doing something wrong here?

Thank you for your help.


filter push down on har file

2017-01-16 Thread Yang Cao
Hi,

My team just do a archive on last year’s parquet files. I wonder whether the 
filter push down optimization still work when I read data through 
“har:///path/to/data/“? THX.

Best,
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org