Re: EC2 cluster created by spark using old HDFS 1.0

2015-03-22 Thread Akhil Das
That's a hadoop version incompatibility issue, you need to make sure
everything runs on the same version.

Thanks
Best Regards

On Sat, Mar 21, 2015 at 1:24 AM, morfious902002  wrote:

> Hi,
> I created a cluster using spark-ec2 script. But it installs HDFS version
> 1.0. I would like to use this cluster to connect to HIVE installed on a
> cloudera CDH 5.3 cluster. But I am getting the following error:-
>
> org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot
> communicate with client vers
> ion 4
> at org.apache.hadoop.ipc.Client.call(Client.java:1070)
> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
> at com.sun.proxy.$Proxy10.getProtocolVersion(Unknown Source)
> at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
> at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
> at
> org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:238)
> at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:203)
> at
>
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:8
> 9)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
> at
>
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
> at
>
> org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.
> java:40)
> at
>
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
> at
> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at
>
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at
>
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at
> org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
> at
> org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at
>
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:83)
> at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:23)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30)
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:32)
> at $iwC$$iwC$$iwC$$iwC.(:34)
> at $iwC$$iwC$$iwC.(:36)
> at $iwC$$iwC.(:38)
> at $iwC.(:40)
> at (:42)
> at .(:46)
> at .()
> 

Re: Buffering for Socket streams

2015-03-22 Thread Akhil Das
You can try playing with spark.streaming.blockInterval so that it wont
consume a lot of data, default value is 200ms

Thanks
Best Regards

On Fri, Mar 20, 2015 at 8:49 PM, jamborta  wrote:

> Hi all,
>
> We are designing a workflow where we try to stream local files to a Socket
> streamer, that would clean and process the files and write them to hdfs. We
> have an issue with bigger files when the streamer cannot keep up with the
> data, and runs out of memory.
>
> What would be the best way to implement an approach where the Socket stream
> receiver would notify the stream not to send more data (stop reading from
> disk too?), just before it might run out of memory?
>
> thanks,
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Buffering-for-Socket-streams-tp22164.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to handle under-performing nodes in the cluster

2015-03-22 Thread Akhil Das
It seems that node is not getting allocated with enough tasks, try
increasing your level of parallelism or do a manual repartition so that
everyone gets even tasks to operate on.

Thanks
Best Regards

On Fri, Mar 20, 2015 at 8:05 PM, Yiannis Gkoufas 
wrote:

> Hi all,
>
> I have 6 nodes in the cluster and one of the nodes is clearly
> under-performing:
>
>
> ​
> I was wandering what is the impact of having such issues? Also what is the
> recommended way to workaround it?
>
> Thanks a lot,
> Yiannis
>


Re: How to check that a dataset is sorted after it has been written out?

2015-03-22 Thread Akhil Das
One approach would be to repartition the whole data into 1 (costly
operation though, but will give you a single file). Also, You could try
using zipWithIndex before writing it out.

Thanks
Best Regards

On Sat, Mar 21, 2015 at 4:11 AM, Michael Albert <
m_albert...@yahoo.com.invalid> wrote:

> Greetings!
>
> I sorted a dataset in Spark and then wrote it out in avro/parquet.
>
> Then I wanted to check that it was sorted.
>
> It looks like each partition has been sorted, but when reading in, the
> first "partition" (i.e., as
> seen in the partition index of mapPartitionsWithIndex) is not the same  as
> implied by
> the names of the parquet files (even when the number of partitions is the
> same in the
> rdd which was read as on disk).
>
> If I "take()" a few hundred values, they are sorted, but they are *not*
> the same as if I
> explicitly open "part-r-0.parquet" and take values from that.
>
> It seems that when opening the rdd, the "partitions" of the rdd are not in
> the same
> order as implied by the data on disk (i.e., "part-r-0.parquet,
> part-r-1.parquet, etc).
>
> So, how might one read the data so that one maintains the sort order?
>
> And while on the subject, after the "terasort", how did they check that
> the
> data was actually sorted correctly? (or did they :-) ? ).
>
> Is there any way to read the data back in so as to preserve the sort, or
> do I need to
> "zipWithIndex" before writing it out, and write the index at that time? (I
> haven't tried the
> latter yet).
>
> Thanks!
> -Mike
>
>


Re: Spark streaming alerting

2015-03-22 Thread Akhil Das
What do you mean you can't send it directly from spark workers? Here's a
simple approach which you could do:

val data = ssc.textFileStream("sigmoid/")
val dist = data.filter(_.contains("ERROR")).foreachRDD(rdd =>
alert("Errors :" + rdd.count()))

And the alert() function could be anything triggering an email or sending
an SMS alert.

Thanks
Best Regards

On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia 
wrote:

> Is there a module in spark streaming that lets you listen to
> the alerts/conditions as they happen in the streaming module? Generally
> spark streaming components will execute on large set of clusters like hdfs
> or Cassandra, however when it comes to alerting you generally can't send it
> directly from the spark workers, which means you need a way to listen to
> the alerts.
>


Re: SocketTimeout only when launching lots of executors

2015-03-22 Thread Akhil Das
It seems your driver is getting flooded by those many executors and hence
it gets timeout. There are some configuration options like
spark.akka.timeout etc, you could try playing with those. More information
will be available here:
http://spark.apache.org/docs/latest/configuration.html

Thanks
Best Regards

On Mon, Mar 23, 2015 at 9:46 AM, Tianshuo Deng 
wrote:

> Hi, spark users.
>
> When running a spark application with lots of executors(300+), I see
> following failures:
>
> java.net.SocketTimeoutException: Read timed out  at
> java.net.SocketInputStream.socketRead0(Native Method)  at
> java.net.SocketInputStream.read(SocketInputStream.java:152)  at
> java.net.SocketInputStream.read(SocketInputStream.java:122)  at
> java.io.BufferedInputStream.fill(BufferedInputStream.java:235)  at
> java.io.BufferedInputStream.read1(BufferedInputStream.java:275)  at
> java.io.BufferedInputStream.read(BufferedInputStream.java:334)  at
> sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:690)  at
> sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)  at
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324)
> at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:583)  at
> org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)  at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:356)
> at
> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:353)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)  at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:353)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> When I reduce the number of executors, the spark app runs fine. From the
> stack trace, it looks like that multiple executors requesting downloading
> dependencies at the same time is causing driver to timeout?
>
> Anyone experienced similar issues or has any suggestions?
>
> Thanks
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to use DataFrame with MySQL

2015-03-22 Thread gavin zhang
OK,I found what the problem is: It couldn't work with mysql-connector-5.0.8.
I updated the connector version to 5.1.34 and it worked.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178p22182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark sql thrift server slower than hive

2015-03-22 Thread Arush Kharbanda
A basis change needed by spark is setting the executor memory which
defaults to 512MB by default.

On Mon, Mar 23, 2015 at 10:16 AM, Denny Lee  wrote:

> How are you running your spark instance out of curiosity?  Via YARN or
> standalone mode?  When connecting Spark thriftserver to the Spark service,
> have you allocated enough memory and CPU when executing with spark?
>
> On Sun, Mar 22, 2015 at 3:39 AM fanooos  wrote:
>
>> We have cloudera CDH 5.3 installed on one machine.
>>
>> We are trying to use spark sql thrift server to execute some analysis
>> queries against hive table.
>>
>> Without any changes in the configurations, we run the following query on
>> both hive and spark sql thrift server
>>
>> *select * from tableName;*
>>
>> The time taken by spark is larger than the time taken by hive which is not
>> supposed to be the like that.
>>
>> The hive table is mapped to json files stored on HDFS directory and we are
>> using *org.openx.data.jsonserde.JsonSerDe* for
>> serialization/deserialization.
>>
>> Why spark takes much more time to execute the query than hive ?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-sql-thrift-server-slower-than-
>> hive-tp22177.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


-- 

[image: Sigmoid Analytics] 

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: spark disk-to-disk

2015-03-22 Thread Reynold Xin
On Sun, Mar 22, 2015 at 6:03 PM, Koert Kuipers  wrote:

> so finally i can resort to:
> rdd.saveAsObjectFile(...)
> sc.objectFile(...)
> but that seems like a rather broken abstraction.
>
>
This seems like a fine solution to me.


Re: Spark sql thrift server slower than hive

2015-03-22 Thread Denny Lee
How are you running your spark instance out of curiosity?  Via YARN or
standalone mode?  When connecting Spark thriftserver to the Spark service,
have you allocated enough memory and CPU when executing with spark?

On Sun, Mar 22, 2015 at 3:39 AM fanooos  wrote:

> We have cloudera CDH 5.3 installed on one machine.
>
> We are trying to use spark sql thrift server to execute some analysis
> queries against hive table.
>
> Without any changes in the configurations, we run the following query on
> both hive and spark sql thrift server
>
> *select * from tableName;*
>
> The time taken by spark is larger than the time taken by hive which is not
> supposed to be the like that.
>
> The hive table is mapped to json files stored on HDFS directory and we are
> using *org.openx.data.jsonserde.JsonSerDe* for
> serialization/deserialization.
>
> Why spark takes much more time to execute the query than hive ?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-sql-thrift-server-slower-than-
> hive-tp22177.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


SocketTimeout only when launching lots of executors

2015-03-22 Thread Tianshuo Deng
Hi, spark users.

When running a spark application with lots of executors(300+), I see following 
failures:

java.net.SocketTimeoutException: Read timed out  at 
java.net.SocketInputStream.socketRead0(Native Method)  at 
java.net.SocketInputStream.read(SocketInputStream.java:152)  at 
java.net.SocketInputStream.read(SocketInputStream.java:122)  at 
java.io.BufferedInputStream.fill(BufferedInputStream.java:235)  at 
java.io.BufferedInputStream.read1(BufferedInputStream.java:275)  at 
java.io.BufferedInputStream.read(BufferedInputStream.java:334)  at 
sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:690)  at 
sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633)  at 
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324)
  at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:583)  at 
org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)  at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:356)
  at 
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:353)
  at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
  at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
 at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)  
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)  at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:98)  at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)  
at 
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:353)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)  
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
 at java.lang.Thread.run(Thread.java:745)

When I reduce the number of executors, the spark app runs fine. From the stack 
trace, it looks like that multiple executors requesting downloading 
dependencies at the same time is causing driver to timeout?

Anyone experienced similar issues or has any suggestions?

Thanks
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Convert Spark SQL table to RDD in Scala / error: value toFloat is a not a member of Any

2015-03-22 Thread Ted Yu
I thought of formation #1.
But looks like when there're many fields, formation #2 is cleaner.

Cheers


On Sun, Mar 22, 2015 at 8:14 PM, Cheng Lian  wrote:

>  You need either
>
> .map { row =>
>   (row(0).asInstanceOf[Float], row(1).asInstanceOf[Float], ...)
> }
>
> or
>
> .map { case Row(f0: Float, f1: Float, ...) =>
>   (f0, f1)
> }
>
> On 3/23/15 9:08 AM, Minnow Noir wrote:
>
> I'm following some online tutorial written in Python and trying to
> convert a Spark SQL table object to an RDD in Scala.
>
>  The Spark SQL just loads a simple table from a CSV file.  The tutorial
> says to convert the table to an RDD.
>
>  The Python is
>
> products_rdd = sqlContext.table("products").map(lambda row:
> (float(row[0]),float(row[1]),float(row[2]),float(row[3]),
> float(row[4]),float(row[5]),float(row[6]),float(row[7]),float(row[8]),float(row[9]),float(row[10]),float(row[11])))
>
>  The Scala is *not*
>
> val productsRdd = sqlContext.table("products").map( row => (
>   row(0).toFloat,row(1).toFloat,row(2).toFloat,row(3).toFloat,
> row(4).toFloat,row(5).toFloat,row(6).toFloat,row(7).toFloat,row(8).toFloat,
> row(9).toFloat,row(10).toFloat,row(11).toFloat
> ))
>
>  I know this, because Spark says that for each of the row(x).toFloat
> calls,
> "error: value toFloat is not a member of Any"
>
>  Does anyone know the proper syntax for this?
>
>  Thank you
>
>
>   ​
>


Re: Convert Spark SQL table to RDD in Scala / error: value toFloat is a not a member of Any

2015-03-22 Thread Cheng Lian

You need either

|.map { row =>
  (row(0).asInstanceOf[Float], row(1).asInstanceOf[Float], ...)
}
|

or

|.map {case  Row(f0:Float, f1:Float, ...) =>
  (f0, f1)
}
|

On 3/23/15 9:08 AM, Minnow Noir wrote:

I'm following some online tutorial written in Python and trying to 
convert a Spark SQL table object to an RDD in Scala.


The Spark SQL just loads a simple table from a CSV file.  The tutorial 
says to convert the table to an RDD.


The Python is

products_rdd = sqlContext.table("products").map(lambda row: 
(float(row[0]),float(row[1]),float(row[2]),float(row[3]), 
float(row[4]),float(row[5]),float(row[6]),float(row[7]),float(row[8]),float(row[9]),float(row[10]),float(row[11])))


The Scala is *not*

val productsRdd = sqlContext.table("products").map( row => (
row(0).toFloat,row(1).toFloat,row(2).toFloat,row(3).toFloat, 
row(4).toFloat,row(5).toFloat,row(6).toFloat,row(7).toFloat,row(8).toFloat, 
row(9).toFloat,row(10).toFloat,row(11).toFloat

))

I know this, because Spark says that for each of the row(x).toFloat 
calls,

"error: value toFloat is not a member of Any"

Does anyone know the proper syntax for this?

Thank you



​


Convert Spark SQL table to RDD in Scala / error: value toFloat is a not a member of Any

2015-03-22 Thread Minnow Noir
I'm following some online tutorial written in Python and trying to convert
a Spark SQL table object to an RDD in Scala.

The Spark SQL just loads a simple table from a CSV file.  The tutorial says
to convert the table to an RDD.

The Python is

products_rdd = sqlContext.table("products").map(lambda row:
(float(row[0]),float(row[1]),float(row[2]),float(row[3]),
float(row[4]),float(row[5]),float(row[6]),float(row[7]),float(row[8]),float(row[9]),float(row[10]),float(row[11])))

The Scala is *not*

val productsRdd = sqlContext.table("products").map( row => (
  row(0).toFloat,row(1).toFloat,row(2).toFloat,row(3).toFloat,
row(4).toFloat,row(5).toFloat,row(6).toFloat,row(7).toFloat,row(8).toFloat,
row(9).toFloat,row(10).toFloat,row(11).toFloat
))

I know this, because Spark says that for each of the row(x).toFloat calls,
"error: value toFloat is not a member of Any"

Does anyone know the proper syntax for this?

Thank you


spark disk-to-disk

2015-03-22 Thread Koert Kuipers
i would like to use spark for some algorithms where i make no attempt to
work in memory, so read from hdfs and write to hdfs for every step.
of course i would like every step to only be evaluated once. and i have no
need for spark's RDD lineage info, since i persist to reliable storage.

the trouble is, i am not sure how to proceed.

rdd.checkpoint() seems like the obvious candidate to force my computations
to write to hdfs for intermediate data and cut the lineage, but
rdd.checkpoint() does not actually trigger a job. rdd.checkpoint() runs
after some other action triggered a job, leading to recomputation.

the suggestion in the docs is to do:
rdd.cache(); rdd.checkpoint()
but that wont work for me since the data does not fit in memory.

instead i could do:
rdd.persist(StorageLevel.DISK_ONLY_2); rdd.checkpoint()
but that leads to the data being written to disk twice in a row, which
seems wasteful.

so finally i can resort to:
rdd.saveAsObjectFile(...)
sc.objectFile(...)
but that seems like a rather broken abstraction.

any ideas? i feel like i am missing something obvious. or i am running yet
again into spark's historical in-memory bias?


Re: Should Spark SQL support retrieve column value from Row by column name?

2015-03-22 Thread Michael Armbrust
Please open a JIRA, we added the info to Row that will allow this to
happen, but we need to provide the methods you are asking for.  I'll add
that this does work today in python (i.e. row.columnName).

On Sun, Mar 22, 2015 at 12:40 AM, amghost  wrote:

> I would like to retrieve column value from Spark SQL query result. But
> currently it seems that Spark SQL only support retrieving by index
>
>
> val results = sqlContext.sql("SELECT name FROM people")
> results.map(t => "Name: " + *t(0)*).collect().foreach(println)
>
> I think it will be much more convenient if I could do something like this:
>
> results.map(t => "Name: " + *t("name")*).collect().foreach(println)
>
> How do you like?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Should-Spark-SQL-support-retrieve-column-value-from-Row-by-column-name-tp22174.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrame saveAsTable - partitioned tables

2015-03-22 Thread Michael Armbrust
Not yet.  This is on the roadmap for Spark 1.4.

On Sun, Mar 22, 2015 at 12:19 AM, deenar.toraskar 
wrote:

> Hi
>
> I wanted to store DataFrames as partitioned Hive tables. Is there a way to
> do this via the saveAsTable call. The set of options does not seem to be
> documented.
>
> def
> saveAsTable(tableName: String, source: String, mode: SaveMode, options:
> Map[String, String]): Unit
> (Scala-specific) Creates a table from the the contents of this DataFrame
> based on a given data source, SaveMode specified by mode, and a set of
> options.
>
> Optionally is there a way to just create external hive tables for data that
> is already present on HDFS. something similar to
>
> sc.sql("alter table results add partition (date = '2014');")
>
> Regards
> Deenar
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-saveAsTable-partitioned-tables-tp22173.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrame saveAsTable - partitioned tables

2015-03-22 Thread Michael Armbrust
Note you can use HiveQL syntax for creating dynamically partitioned tables
though.

On Sun, Mar 22, 2015 at 1:29 PM, Michael Armbrust 
wrote:

> Not yet.  This is on the roadmap for Spark 1.4.
>
> On Sun, Mar 22, 2015 at 12:19 AM, deenar.toraskar 
> wrote:
>
>> Hi
>>
>> I wanted to store DataFrames as partitioned Hive tables. Is there a way to
>> do this via the saveAsTable call. The set of options does not seem to be
>> documented.
>>
>> def
>> saveAsTable(tableName: String, source: String, mode: SaveMode, options:
>> Map[String, String]): Unit
>> (Scala-specific) Creates a table from the the contents of this DataFrame
>> based on a given data source, SaveMode specified by mode, and a set of
>> options.
>>
>> Optionally is there a way to just create external hive tables for data
>> that
>> is already present on HDFS. something similar to
>>
>> sc.sql("alter table results add partition (date = '2014');")
>>
>> Regards
>> Deenar
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-saveAsTable-partitioned-tables-tp22173.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to use DataFrame with MySQL

2015-03-22 Thread Michael Armbrust
Can you try adding "driver" -> "com.mysql.jdbc.Driver"?  This causes the
class to get loaded both locally and the workers so that it can register
with JDBC.

On Sun, Mar 22, 2015 at 7:32 AM, gavin zhang  wrote:

> OK, I have known that I could use jdbc connector to create DataFrame with
> this command:
>
> val jdbcDF = sqlContext.load("jdbc", Map("url" ->
> "jdbc:mysql://localhost:3306/video_rcmd?user=root&password=123456",
> "dbtable" -> "video"))
>
> But I got this error:
>
> java.sql.SQLException: No suitable driver found for ...
>
> And I have tried to add jdbc jar to spark_path with both commands but
> failed:
>
> - spark-shell --jars mysql-connector-java-5.0.8-bin.jar
> - SPARK_CLASSPATH=mysql-connector-java-5.0.8-bin.jar spark-shell
>
> My Spark version is 1.3.0 while
> `Class.forName("com.mysql.jdbc.Driver").newInstance` is worked.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: join two DataFrames, same column name

2015-03-22 Thread Michael Armbrust
You can include * and a column alias in the same select clause
var df1 = sqlContext.sql("select *, column_id AS table1_id from table1")


I'm also hoping to resolve SPARK-6376
 before Spark 1.3.1 which
will let you do something like:
var df1 = sqlContext.sql("select * from table1").as("t1")
var df2 = sqlContext.sql("select * from table2).as("t2")
df1.join(df2, df1("column_id") === df2("column_id")).select("t1.column_id")

Finally, there is SPARK-6380
 that hopes to simplify
this particular case.

Michael

On Sat, Mar 21, 2015 at 3:02 PM, Eric Friedman 
wrote:

> I have a couple of data frames that I pulled from SparkSQL and the primary
> key of one is a foreign key of the same name in the other.  I'd rather not
> have to specify each column in the SELECT statement just so that I can
> rename this single column.
>
> When I try to join the data frames, I get an exception because it finds
> the two columns of the same name to be ambiguous.  Is there a way to
> specify which side of the join comes from data frame A and which comes from
> B?
>
> var df1 = sqlContext.sql("select * from table1")
> var df2 = sqlContext.sql("select * from table2)
>
> df1.join(df2, df1("column_id") === df2("column_id"))
>


Re: netlib-java cannot load native lib in Windows when using spark-submit

2015-03-22 Thread Burak Yavuz
Did you build Spark with: -Pnetlib-lgpl?

Ref: https://spark.apache.org/docs/latest/mllib-guide.html

Burak

On Sun, Mar 22, 2015 at 7:37 AM, Ted Yu  wrote:

> How about pointing LD_LIBRARY_PATH to native lib folder ?
>
> You need Spark 1.2.0 or higher for the above to work. See SPARK-1719
>
> Cheers
>
> On Sun, Mar 22, 2015 at 4:02 AM, Xi Shen  wrote:
>
>> Hi Ted,
>>
>> I have tried to invoke the command from both cygwin environment and
>> powershell environment. I still get the messages:
>>
>> 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from:
>> com.github.fommil.netlib.NativeSystemBLAS
>> 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from:
>> com.github.fommil.netlib.NativeRefBLAS
>>
>> From the Spark UI, I can see:
>>
>>   spark.driver.extraLibrary c:\openblas
>>
>>
>> Thanks,
>> David
>>
>>
>> On Sun, Mar 22, 2015 at 11:45 AM Ted Yu  wrote:
>>
>>> Can you try the --driver-library-path option ?
>>>
>>> spark-submit --driver-library-path /opt/hadoop/lib/native ...
>>>
>>> Cheers
>>>
>>> On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen  wrote:
>>>
 Hi,

 I use the *OpenBLAS* DLL, and have configured my application to work
 in IDE. When I start my Spark application from IntelliJ IDE, I can see in
 the log that the native lib is loaded successfully.

 But if I use *spark-submit* to start my application, the native lib
 still cannot be load. I saw the WARN message that it failed to load both
 the native and native-ref library. I checked the *Environment* tab in
 the Spark UI, and the *java.library.path* is set correctly.


 Thanks,

 David



>>>
>


Re: lower&upperBound not working/spark 1.3

2015-03-22 Thread Ted Yu
I went over JDBCRelation#columnPartition() but didn't find obvious clue
(you can add more logging to confirm that the partitions were generated
correctly).

Looks like the issue may be somewhere else.

Cheers

On Sun, Mar 22, 2015 at 12:47 PM, Marek Wiewiorka  wrote:

> ...I even tried setting upper/lower bounds to the same value like 1 or 10
> with the same result.
> cs_id is a column of the cardinality ~5*10^6
> So this is not the case here.
>
> Regards,
> Marek
>
> 2015-03-22 20:30 GMT+01:00 Ted Yu :
>
>> From javadoc of JDBCRelation#columnPartition():
>>* Given a partitioning schematic (a column of integral type, a number
>> of
>>* partitions, and upper and lower bounds on the column's value),
>> generate
>>
>> In your example, 1 and 1 are for the value of cs_id column.
>>
>> Looks like all the values in that column fall within the range of 1 and
>> 1000.
>>
>> Cheers
>>
>> On Sun, Mar 22, 2015 at 8:44 AM, Marek Wiewiorka <
>> marek.wiewio...@gmail.com> wrote:
>>
>>> Hi All - I try to use the new SQLContext API for populating DataFrame
>>> from jdbc data source.
>>> like this:
>>>
>>> val jdbcDF = sqlContext.jdbc(url =
>>> "jdbc:postgresql://localhost:5430/dbname?user=user&password=111", table =
>>> "se_staging.exp_table3" ,columnName="cs_id",lowerBound=1 ,upperBound =
>>> 1, numPartitions=12 )
>>>
>>> No matter how I set lower and upper bounds I always get all the rows
>>> from my table.
>>> The API is marked as experimental so I assume there might by some bugs
>>> in it but
>>> did anybody come across a similar issue?
>>>
>>> Thanks!
>>>
>>
>>
>


Re: lower&upperBound not working/spark 1.3

2015-03-22 Thread Marek Wiewiorka
...I even tried setting upper/lower bounds to the same value like 1 or 10
with the same result.
cs_id is a column of the cardinality ~5*10^6
So this is not the case here.

Regards,
Marek

2015-03-22 20:30 GMT+01:00 Ted Yu :

> From javadoc of JDBCRelation#columnPartition():
>* Given a partitioning schematic (a column of integral type, a number of
>* partitions, and upper and lower bounds on the column's value),
> generate
>
> In your example, 1 and 1 are for the value of cs_id column.
>
> Looks like all the values in that column fall within the range of 1 and
> 1000.
>
> Cheers
>
> On Sun, Mar 22, 2015 at 8:44 AM, Marek Wiewiorka <
> marek.wiewio...@gmail.com> wrote:
>
>> Hi All - I try to use the new SQLContext API for populating DataFrame
>> from jdbc data source.
>> like this:
>>
>> val jdbcDF = sqlContext.jdbc(url =
>> "jdbc:postgresql://localhost:5430/dbname?user=user&password=111", table =
>> "se_staging.exp_table3" ,columnName="cs_id",lowerBound=1 ,upperBound =
>> 1, numPartitions=12 )
>>
>> No matter how I set lower and upper bounds I always get all the rows from
>> my table.
>> The API is marked as experimental so I assume there might by some bugs in
>> it but
>> did anybody come across a similar issue?
>>
>> Thanks!
>>
>
>


Re: lower&upperBound not working/spark 1.3

2015-03-22 Thread Ted Yu
>From javadoc of JDBCRelation#columnPartition():
   * Given a partitioning schematic (a column of integral type, a number of
   * partitions, and upper and lower bounds on the column's value), generate

In your example, 1 and 1 are for the value of cs_id column.

Looks like all the values in that column fall within the range of 1 and
1000.

Cheers

On Sun, Mar 22, 2015 at 8:44 AM, Marek Wiewiorka 
wrote:

> Hi All - I try to use the new SQLContext API for populating DataFrame from
> jdbc data source.
> like this:
>
> val jdbcDF = sqlContext.jdbc(url =
> "jdbc:postgresql://localhost:5430/dbname?user=user&password=111", table =
> "se_staging.exp_table3" ,columnName="cs_id",lowerBound=1 ,upperBound =
> 1, numPartitions=12 )
>
> No matter how I set lower and upper bounds I always get all the rows from
> my table.
> The API is marked as experimental so I assume there might by some bugs in
> it but
> did anybody come across a similar issue?
>
> Thanks!
>


Re: Load balancing

2015-03-22 Thread Mohit Anchlia
posting my question again :)

Thanks for the pointer, looking at the below description from the site it
looks like in spark block size is not fixed, it's determined by block
interval and in fact for the same batch you could have different block
sizes. Did I get it right?

-
Another parameter that should be considered is the receiver’s blocking
interval, which is determined by the configuration parameter

spark.streaming.blockInterval. For most receivers, the received data is
coalesced together into blocks of data before storing inside Spark’s
memory. The number of blocks in each batch determines the number of tasks
that will be used to process those the received data in a map-like
transformation. The number of tasks per receiver per batch will be
approximately (batch interval / block interval). For example, block
interval of 200 ms will create 10 tasks per 2 second batches. Too low the
number of tasks (that is, less than the number of cores per machine), then
it will be inefficient as all available cores will not be used to process
the data. To increase the number of tasks for a given batch interval,
reduce the block interval. However, the recommended minimum value of block
interval is about 50 ms, below which the task launching overheads may be a
problem.
--


Also, I am not clear about the data flow of the receiver. When client gets
handle to a spark context and calls something like
"val lines = ssc.socketTextStream("localhost", )", is this the point
when spark master is contacted to determine which spark worker node the
data is going to go to?

On Sun, Mar 22, 2015 at 2:10 AM, Jeffrey Jedele 
wrote:

> Hi Mohit,
> please make sure you use the "Reply to all" button and include the mailing
> list, otherwise only I will get your message ;)
>
> Regarding your question:
> Yes, that's also my understanding. You can partition streaming RDDs only
> by time intervals, not by size. So depending on your incoming rate, they
> may vary.
>
> I do not know exactly what the life cycle of the receiver is, but I don't
> think sth actually happens when you create the DStream. My guess would be
> that the receiver is allocated when you call
> StreamingContext#startStreams(),
>
> Regards,
> Jeff
>
> 2015-03-21 21:19 GMT+01:00 Mohit Anchlia :
>
>> Could somebody help me understand the question I posted earlier?
>>
>> On Fri, Mar 20, 2015 at 9:44 AM, Mohit Anchlia 
>> wrote:
>>
>>> Thanks for the pointer, looking at the below description from the site
>>> it looks like in spark block size is not fixed, it's determined by block
>>> interval and in fact for the same batch you could have different block
>>> sizes. Did I get it right?
>>>
>>> -
>>> Another parameter that should be considered is the receiver’s blocking
>>> interval, which is determined by the configuration parameter
>>> 
>>> spark.streaming.blockInterval. For most receivers, the received data is
>>> coalesced together into blocks of data before storing inside Spark’s
>>> memory. The number of blocks in each batch determines the number of tasks
>>> that will be used to process those the received data in a map-like
>>> transformation. The number of tasks per receiver per batch will be
>>> approximately (batch interval / block interval). For example, block
>>> interval of 200 ms will create 10 tasks per 2 second batches. Too low the
>>> number of tasks (that is, less than the number of cores per machine), then
>>> it will be inefficient as all available cores will not be used to process
>>> the data. To increase the number of tasks for a given batch interval,
>>> reduce the block interval. However, the recommended minimum value of block
>>> interval is about 50 ms, below which the task launching overheads may be a
>>> problem.
>>> --
>>>
>>>
>>> Also, I am not clear about the data flow of the receiver. When client
>>> gets handle to a spark context and calls something like "val lines = ssc
>>> .socketTextStream("localhost", )", is this the point when spark
>>> master is contacted to determine which spark worker node the data is going
>>> to go to?
>>>
>>> On Fri, Mar 20, 2015 at 1:33 AM, Jeffrey Jedele <
>>> jeffrey.jed...@gmail.com> wrote:
>>>
 Hi Mohit,
 it also depends on what the source for your streaming application is.

 If you use Kafka, you can easily partition topics and have multiple
 receivers on different machines.

 If you have sth like a HTTP, socket, etc stream, you probably can't do
 that. The Spark RDDs generated by your receiver will be partitioned and
 processed in a distributed manner like usual Spark RDDs however. There are
 parameters to control that behavior (e.g. defaultParallelism and
 blockInterval).

 See here for more details:

 http://spark.apache.org/docs/1.2.1/streaming-programming-g

Re: How Does aggregate work

2015-03-22 Thread Ted Yu
I assume spark.default.parallelism is 4 in the VM Ashish was using.

Cheers


lower&upperBound not working/spark 1.3

2015-03-22 Thread Marek Wiewiorka
Hi All - I try to use the new SQLContext API for populating DataFrame from
jdbc data source.
like this:

val jdbcDF = sqlContext.jdbc(url =
"jdbc:postgresql://localhost:5430/dbname?user=user&password=111", table =
"se_staging.exp_table3" ,columnName="cs_id",lowerBound=1 ,upperBound =
1, numPartitions=12 )

No matter how I set lower and upper bounds I always get all the rows from
my table.
The API is marked as experimental so I assume there might by some bugs in
it but
did anybody come across a similar issue?

Thanks!


How to check that a dataset is sorted after it has been written out? [repost]

2015-03-22 Thread Michael Albert
Greetings![My apologies for this respost, I'm not certain that the first 
message made it to the list].
I sorted a dataset in Spark and then wrote it out in avro/parquet.
Then I wanted to check that it was sorted.
It looks like each partition has been sorted, but when reading in, the first 
"partition" (i.e., as seen in the partition index of mapPartitionsWithIndex) is 
not the same  as implied by the names of the parquet files (even when the 
number of partitions is the same in therdd which was read as on disk).
If I "take()" a few hundred values, they are sorted, but they are *not* the 
same as if I explicitly open "part-r-0.parquet" and take values from that.
It seems that when opening the rdd, the "partitions" of the rdd are not in the 
sameorder as implied by the data on disk (i.e., "part-r-0.parquet, 
part-r-1.parquet, etc).
So, how might one read the data so that one maintains the sort order?
And while on the subject, after the "terasort", how did they check that the 
data was actually sorted correctly? (or did they :-) ? ).
Is there any way to read the data back in so as to preserve the sort, or do I 
need to "zipWithIndex" before writing it out, and write the index at that time? 
(I haven't tried the latter yet).
Thanks!-Mike



Re: Error while installing Spark 1.3.0 on local machine

2015-03-22 Thread Dean Wampler
Any particular reason you're not just downloading a build from
http://spark.apache.org/downloads.html Even if you aren't using Hadoop, any
of those builds will work.

If you want to build from source, the Maven build is more reliable.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Sat, Mar 21, 2015 at 5:52 PM, HARIPRIYA AYYALASOMAYAJULA <
aharipriy...@gmail.com> wrote:

> Hello,
>
> I am trying to install Spark 1.3.0 on my mac. Earlier, I was working with
> Spark 1.1.0. Now, I come across this error :
>
> sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-network-common_2.10;1.3.0: configuration not public
> in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was
> required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test
> at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:278)
> at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:175)
> at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:157)
> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151)
> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151)
> at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:128)
> at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56)
> at sbt.IvySbt$$anon$4.call(Ivy.scala:64)
> at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
> at
> xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
> at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
> at xsbt.boot.Using$.withResource(Using.scala:10)
> at xsbt.boot.Using$.apply(Using.scala:9)
> at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
> at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
> at xsbt.boot.Locks$.apply0(Locks.scala:31)
> at xsbt.boot.Locks$.apply(Locks.scala:28)
> at sbt.IvySbt.withDefaultLogger(Ivy.scala:64)
> at sbt.IvySbt.withIvy(Ivy.scala:123)
> at sbt.IvySbt.withIvy(Ivy.scala:120)
> at sbt.IvySbt$Module.withModule(Ivy.scala:151)
> at sbt.IvyActions$.updateEither(IvyActions.scala:157)
> at
> sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1318)
> at
> sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1315)
> at
> sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1345)
> at
> sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1343)
> at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35)
> at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1348)
> at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1342)
> at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45)
> at sbt.Classpaths$.cachedUpdate(Defaults.scala:1360)
> at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1300)
> at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1275)
> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
> at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
> at sbt.std.Transform$$anon$4.work(System.scala:63)
> at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
> at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
> at sbt.Execute.work(Execute.scala:235)
> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
> at
> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
> at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
> at java.lang.Thread.run(Thread.java:695)
> [error] (network-shuffle/*:update) sbt.ResolveException: unresolved
> dependency: org.apache.spark#spark-network-common_2.10;1.3.0: configuration
> not public in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It
> was required from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test
> [error] Total time: 5 s, completed Mar 21, 2015 7:48:45 PM
>
> I tried uninstalling and re - installing, when I browsed over the
> internet, I came across suggestions to include -Phadoop, now even if I use
>
>  build/sbt -Pyarn -Phadoop-2.3 assembly
>
> It gives me an error.
>
> I greatly appreciate any help. Thank you for your time.
>
>
> --
> Regards,
> Haripriya Ayyalasomay

Re: can distinct transform applied on DStream?

2015-03-22 Thread Dean Wampler
aDstream.transform(_.distinct())  will only make the elements of each RDD
in the DStream distinct, not for the whole DStream globally. Is that what
you're seeing?

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Fri, Mar 20, 2015 at 10:37 AM, Darren Hoo  wrote:

> val aDstream = ...
>
> val distinctStream = aDstream.transform(_.distinct())
>
> but the elements in distinctStream  are not distinct.
>
> Did I use it wrong?
>


Re: How Does aggregate work

2015-03-22 Thread Dean Wampler
2 is added every time the final partition aggregator is called. The result
of summing the elements across partitions is 9 of course. If you force a
single partition (using spark-shell in local mode):

scala> val data = sc.parallelize(List(2,3,4),1)
scala> data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
res0: Int = 11

The 2nd function is still called, even though there is only one partition
(presumably either x or y is set to 0).

For every additional partition you specify as the 2nd arg. to parallelize,
the 2nd function will be called again:


scala> val data = sc.parallelize(List(2,3,4),1)
scala> data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
res0: Int = 11

scala> val data = sc.parallelize(List(2,3,4),2)
scala> data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
res0: Int = 13

scala> val data = sc.parallelize(List(2,3,4),3)
scala> data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
res0: Int = 15

scala> val data = sc.parallelize(List(2,3,4),4)
scala> data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
res0: Int = 17

Hence, it appears that not specifying the 2nd argument resulted in 4
partitions, even though you only had three elements in the list.

If p_i is the ith partition, the final sum appears to be:

(2 + ... (2 + (2 + (2 + 0 + p_1) + p_2) + p_3) ...)


Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Sun, Mar 22, 2015 at 8:05 AM, ashish.usoni 
wrote:

> Hi ,
> I am not able to understand how aggregate function works, Can some one
> please explain how below result came
> I am running spark using cloudera VM
>
> The result in below is 17 but i am not able to find out how it is
> calculating 17
> val data = sc.parallelize(List(2,3,4))
> data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
> *res21: Int = 17*
>
> Also when i try to change the 2nd parameter in sc.parallelize i get
> different result
>
> val data = sc.parallelize(List(2,3,4),2)
> data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
> *res21: Int = 13*
>
> Thanks for the help.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-Does-aggregate-work-tp22179.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


How Does aggregate work

2015-03-22 Thread ashish.usoni
Hi , 
I am not able to understand how aggregate function works, Can some one
please explain how below result came 
I am running spark using cloudera VM 

The result in below is 17 but i am not able to find out how it is
calculating 17
val data = sc.parallelize(List(2,3,4))
data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
*res21: Int = 17*

Also when i try to change the 2nd parameter in sc.parallelize i get
different result 

val data = sc.parallelize(List(2,3,4),2)
data.aggregate(0)((x,y) => x+y,(x,y) => 2+x+y)
*res21: Int = 13*

Thanks for the help.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-Does-aggregate-work-tp22179.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: netlib-java cannot load native lib in Windows when using spark-submit

2015-03-22 Thread Ted Yu
How about pointing LD_LIBRARY_PATH to native lib folder ?

You need Spark 1.2.0 or higher for the above to work. See SPARK-1719

Cheers

On Sun, Mar 22, 2015 at 4:02 AM, Xi Shen  wrote:

> Hi Ted,
>
> I have tried to invoke the command from both cygwin environment and
> powershell environment. I still get the messages:
>
> 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
> 15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS
>
> From the Spark UI, I can see:
>
>   spark.driver.extraLibrary c:\openblas
>
>
> Thanks,
> David
>
>
> On Sun, Mar 22, 2015 at 11:45 AM Ted Yu  wrote:
>
>> Can you try the --driver-library-path option ?
>>
>> spark-submit --driver-library-path /opt/hadoop/lib/native ...
>>
>> Cheers
>>
>> On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen  wrote:
>>
>>> Hi,
>>>
>>> I use the *OpenBLAS* DLL, and have configured my application to work in
>>> IDE. When I start my Spark application from IntelliJ IDE, I can see in the
>>> log that the native lib is loaded successfully.
>>>
>>> But if I use *spark-submit* to start my application, the native lib
>>> still cannot be load. I saw the WARN message that it failed to load both
>>> the native and native-ref library. I checked the *Environment* tab in
>>> the Spark UI, and the *java.library.path* is set correctly.
>>>
>>>
>>> Thanks,
>>>
>>> David
>>>
>>>
>>>
>>


How to use DataFrame with MySQL

2015-03-22 Thread gavin zhang
OK, I have known that I could use jdbc connector to create DataFrame with
this command:

val jdbcDF = sqlContext.load("jdbc", Map("url" ->
"jdbc:mysql://localhost:3306/video_rcmd?user=root&password=123456",
"dbtable" -> "video"))

But I got this error: 

java.sql.SQLException: No suitable driver found for ...

And I have tried to add jdbc jar to spark_path with both commands but
failed:

- spark-shell --jars mysql-connector-java-5.0.8-bin.jar
- SPARK_CLASSPATH=mysql-connector-java-5.0.8-bin.jar spark-shell

My Spark version is 1.3.0 while
`Class.forName("com.mysql.jdbc.Driver").newInstance` is worked.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-DataFrame-with-MySQL-tp22178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ArrayIndexOutOfBoundsException in ALS.trainImplicit

2015-03-22 Thread Sabarish Sasidharan
My bad. This was an outofmemory disguised as something else.

Regards
Sab

On Sun, Mar 22, 2015 at 1:53 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> I am consistently running into this ArrayIndexOutOfBoundsException issue
> when using trainImplicit. I have tried changing the partitions and
> switching to JavaSerializer. But they don't seem to help. I see that this
> is the same as https://issues.apache.org/jira/browse/SPARK-3080. My
> lambda is 0.01, rank is 5,  iterations is 10 and alpha is 0.01. I am using
> 41 executors, each with 8GB on a 48 million dataset.
>
> 15/03/21 13:07:29 ERROR executor.Executor: Exception in task 12.0 in stage
> 2808.0 (TID 40575)
> java.lang.ArrayIndexOutOfBoundsException: 692
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.scala:548)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at org.apache.spark.mllib.recommendation.ALS.org
> $apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:542)
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:510)
> at
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:509)
> at
> org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
> at
> org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
>
> How can I get around this issue?
>
> ​Regards
> Sab
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Reducing Spark's logging verbosity

2015-03-22 Thread Emre Sevinc
Hello Edmon,

Does the following help?

http://stackoverflow.com/questions/27248997/how-to-suppress-spark-logging-in-unit-tests/2736#2736

--
Emre Sevinç
http://www.bigindustries.be
On Mar 22, 2015 1:44 AM, "Edmon Begoli"  wrote:

> Hi,
> Does anyone have concrete recommendations how to reduce Spark's logging
> verbosity.
>
> We have attempted on several occasions to address this by setting various
> log4j properties, both in configuration property files and in
> $SPARK_HOME/conf/ spark-env.sh; however, all of those attempts have failed.
>
> Any suggestions are welcome.
>
> Thank you,
> Edmon
>


Re: converting DStream[String] into RDD[String] in spark streaming

2015-03-22 Thread Sean Owen
On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar  wrote:
> 1) if there are no sliding window calls in this streaming context, will
> there just one file written per interval?

As many files as there are partitions will be written in each interval.

> 2) if there is a sliding window call in the same context, such as
>
> val hashTags = stream.flatMap(json =>
> DataObjectFactory.createStatus(json).getText.split("
> ").filter(_.startsWith("#")))
>
> val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
> Seconds(600))
>  .map{case (topic, count) => (count, topic)}
>  .transform(_.sortByKey(false))
>
> will the some files get written multiples time (as long as the interval is
> in the batch)

I don't think it's right to say files will be written many times, but
yes it is my understanding that data will be written many times since
a datum lies in many windows.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set Spark executor memory?

2015-03-22 Thread Xi Shen
OK, I actually got the answer days ago from StackOverflow, but I did not
check it :(

When running in "local" mode, to set the executor memory

- when using spark-submit, use "--driver-memory"
- when running as a Java application, like executing from IDE, set the
"-Xmx" vm option


Thanks,
David


On Sun, Mar 22, 2015 at 2:10 PM Ted Yu  wrote:

> bq. the BLAS native cannot be loaded
>
> Have you tried specifying --driver-library-path option ?
>
> Cheers
>
> On Sat, Mar 21, 2015 at 4:42 PM, Xi Shen  wrote:
>
>> Yeah, I think it is harder to troubleshot the properties issues in a IDE.
>> But the reason I stick to IDE is because if I use spark-submit, the BLAS
>> native cannot be loaded. May be I should open another thread to discuss
>> that.
>>
>> Thanks,
>> David
>>
>> On Sun, 22 Mar 2015 10:38 Xi Shen  wrote:
>>
>>> In the log, I saw
>>>
>>>   MemoryStorage: MemoryStore started with capacity 6.7GB
>>>
>>> But I still can not find where to set this storage capacity.
>>>
>>> On Sat, 21 Mar 2015 20:30 Xi Shen  wrote:
>>>
 Hi Sean,

 It's getting strange now. If I ran from IDE, my executor memory is
 always set to 6.7G, no matter what value I set in code. I have check my
 environment variable, and there's no value of 6.7, or 12.5

 Any idea?

 Thanks,
 David

 On Tue, 17 Mar 2015 00:35 null  wrote:

>  Hi Xi Shen,
>
> You could set the spark.executor.memory in the code itself . new 
> SparkConf()..set("spark.executor.memory", "2g")
>
> Or you can try the -- spark.executor.memory 2g while submitting the
> jar.
>
>
>
> Regards
>
> Jishnu Prathap
>
>
>
> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *Sent:* Monday, March 16, 2015 2:06 PM
> *To:* Xi Shen
> *Cc:* user@spark.apache.org
> *Subject:* Re: How to set Spark executor memory?
>
>
>
> By default spark.executor.memory is set to 512m, I'm assuming since
> you are submiting the job using spark-submit and it is not able to 
> override
> the value since you are running in local mode. Can you try it without 
> using
> spark-submit as a standalone project?
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Mar 16, 2015 at 1:52 PM, Xi Shen 
> wrote:
>
> I set it in code, not by configuration. I submit my jar file to local.
> I am working in my developer environment.
>
>
>
> On Mon, 16 Mar 2015 18:28 Akhil Das 
> wrote:
>
> How are you setting it? and how are you submitting the job?
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Mar 16, 2015 at 12:52 PM, Xi Shen 
> wrote:
>
> Hi,
>
>
>
> I have set spark.executor.memory to 2048m, and in the UI "Environment"
> page, I can see this value has been set correctly. But in the "Executors"
> page, I saw there's only 1 executor and its memory is 265.4MB. Very 
> strange
> value. why not 256MB, or just as what I set?
>
>
>
> What am I missing here?
>
>
>
>
>
> Thanks,
>
> David
>
>
>
>
>
>
>  The information contained in this electronic message and any
> attachments to this message are intended for the exclusive use of the
> addressee(s) and may contain proprietary, confidential or privileged
> information. If you are not the intended recipient, you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately and destroy all copies of this message and any attachments.
> WARNING: Computer viruses can be transmitted via email. The recipient
> should check this email and any attachments for the presence of viruses.
> The company accepts no liability for any damage caused by any virus
> transmitted by this email. www.wipro.com
>

>


Re: How to do nested foreach with RDD

2015-03-22 Thread Xi Shen
Hi Reza,

Yes, I just found RDD.cartesian(). Very useful.

Thanks,
David


On Sun, Mar 22, 2015 at 5:08 PM Reza Zadeh  wrote:

> You can do this with the 'cartesian' product method on RDD. For example:
>
> val rdd1 = ...
> val rdd2 = ...
>
> val combinations = rdd1.cartesian(rdd2).filter{ case (a,b) => a < b }
>
> Reza
>
> On Sat, Mar 21, 2015 at 10:37 PM, Xi Shen  wrote:
>
>> Hi,
>>
>> I have two big RDD, and I need to do some math against each pair of them.
>> Traditionally, it is like a nested for-loop. But for RDD, it cause a nested
>> RDD which is prohibited.
>>
>> Currently, I am collecting one of them, then do a nested for-loop, so to
>> avoid nested RDD. But would like to know if there's spark-way to do this.
>>
>>
>> Thanks,
>> David
>>
>>
>


Re: netlib-java cannot load native lib in Windows when using spark-submit

2015-03-22 Thread Xi Shen
Hi Ted,

I have tried to invoke the command from both cygwin environment and
powershell environment. I still get the messages:

15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS
15/03/22 21:56:00 WARN netlib.BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeRefBLAS

>From the Spark UI, I can see:

  spark.driver.extraLibrary c:\openblas


Thanks,
David


On Sun, Mar 22, 2015 at 11:45 AM Ted Yu  wrote:

> Can you try the --driver-library-path option ?
>
> spark-submit --driver-library-path /opt/hadoop/lib/native ...
>
> Cheers
>
> On Sat, Mar 21, 2015 at 4:58 PM, Xi Shen  wrote:
>
>> Hi,
>>
>> I use the *OpenBLAS* DLL, and have configured my application to work in
>> IDE. When I start my Spark application from IntelliJ IDE, I can see in the
>> log that the native lib is loaded successfully.
>>
>> But if I use *spark-submit* to start my application, the native lib
>> still cannot be load. I saw the WARN message that it failed to load both
>> the native and native-ref library. I checked the *Environment* tab in
>> the Spark UI, and the *java.library.path* is set correctly.
>>
>>
>> Thanks,
>>
>> David
>>
>>
>>
>


Spark sql thrift server slower than hive

2015-03-22 Thread fanooos
We have cloudera CDH 5.3 installed on one machine.

We are trying to use spark sql thrift server to execute some analysis
queries against hive table.

Without any changes in the configurations, we run the following query on
both hive and spark sql thrift server

*select * from tableName;*

The time taken by spark is larger than the time taken by hive which is not
supposed to be the like that.

The hive table is mapped to json files stored on HDFS directory and we are
using *org.openx.data.jsonserde.JsonSerDe* for
serialization/deserialization.

Why spark takes much more time to execute the query than hive ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sql-thrift-server-slower-than-hive-tp22177.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Load balancing

2015-03-22 Thread Jeffrey Jedele
Hi Mohit,
please make sure you use the "Reply to all" button and include the mailing
list, otherwise only I will get your message ;)

Regarding your question:
Yes, that's also my understanding. You can partition streaming RDDs only by
time intervals, not by size. So depending on your incoming rate, they may
vary.

I do not know exactly what the life cycle of the receiver is, but I don't
think sth actually happens when you create the DStream. My guess would be
that the receiver is allocated when you call
StreamingContext#startStreams(),

Regards,
Jeff

2015-03-21 21:19 GMT+01:00 Mohit Anchlia :

> Could somebody help me understand the question I posted earlier?
>
> On Fri, Mar 20, 2015 at 9:44 AM, Mohit Anchlia 
> wrote:
>
>> Thanks for the pointer, looking at the below description from the site it
>> looks like in spark block size is not fixed, it's determined by block
>> interval and in fact for the same batch you could have different block
>> sizes. Did I get it right?
>>
>> -
>> Another parameter that should be considered is the receiver’s blocking
>> interval, which is determined by the configuration parameter
>> 
>> spark.streaming.blockInterval. For most receivers, the received data is
>> coalesced together into blocks of data before storing inside Spark’s
>> memory. The number of blocks in each batch determines the number of tasks
>> that will be used to process those the received data in a map-like
>> transformation. The number of tasks per receiver per batch will be
>> approximately (batch interval / block interval). For example, block
>> interval of 200 ms will create 10 tasks per 2 second batches. Too low the
>> number of tasks (that is, less than the number of cores per machine), then
>> it will be inefficient as all available cores will not be used to process
>> the data. To increase the number of tasks for a given batch interval,
>> reduce the block interval. However, the recommended minimum value of block
>> interval is about 50 ms, below which the task launching overheads may be a
>> problem.
>> --
>>
>>
>> Also, I am not clear about the data flow of the receiver. When client
>> gets handle to a spark context and calls something like "val lines = ssc.
>> socketTextStream("localhost", )", is this the point when spark
>> master is contacted to determine which spark worker node the data is going
>> to go to?
>>
>> On Fri, Mar 20, 2015 at 1:33 AM, Jeffrey Jedele > > wrote:
>>
>>> Hi Mohit,
>>> it also depends on what the source for your streaming application is.
>>>
>>> If you use Kafka, you can easily partition topics and have multiple
>>> receivers on different machines.
>>>
>>> If you have sth like a HTTP, socket, etc stream, you probably can't do
>>> that. The Spark RDDs generated by your receiver will be partitioned and
>>> processed in a distributed manner like usual Spark RDDs however. There are
>>> parameters to control that behavior (e.g. defaultParallelism and
>>> blockInterval).
>>>
>>> See here for more details:
>>>
>>> http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning
>>>
>>> Regards,
>>> Jeff
>>>
>>> 2015-03-20 8:02 GMT+01:00 Akhil Das :
>>>
 1. If you are consuming data from Kafka or any other receiver based
 sources, then you can start 1-2 receivers per worker (assuming you'll have
 min 4 core per worker)

 2. If you are having single receiver or is a fileStream then what you
 can do to distribute the data across machines is to do a repartition.

 Thanks
 Best Regards

 On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia >>> > wrote:

> I am trying to understand how to load balance the incoming data to
> multiple spark streaming workers. Could somebody help me understand how I
> can distribute my incoming data from various sources such that incoming
> data is going to multiple spark streaming nodes? Is it done by spark 
> client
> with help of spark master similar to hadoop client asking namenodes for 
> the
> list of datanodes?
>


>>>
>>
>


Re: converting DStream[String] into RDD[String] in spark streaming

2015-03-22 Thread deenar.toraskar
Sean

Dstream.saveAsTextFiles internally calls foreachRDD and saveAsTextFile for
each interval

  def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
  val file = rddToFileName(prefix, suffix, time)
  rdd.saveAsTextFile(file)
}
this.foreachRDD(saveFunc)
  }

val sparkConf = new SparkConf().setAppName("TwitterRawJSON")
val ssc = new StreamingContext(sparkConf, Seconds(30))
stream.saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON")

1) if there are no sliding window calls in this streaming context, will
there just one file written per interval?
2) if there is a sliding window call in the same context, such as

val hashTags = stream.flatMap(json =>
DataObjectFactory.createStatus(json).getText.split("
").filter(_.startsWith("#")))

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _,
Seconds(600))
 .map{case (topic, count) => (count, topic)}
 .transform(_.sortByKey(false))

will the some files get written multiples time (as long as the interval is
in the batch)

Deenar

>>DStream.foreachRDD gives you an RDD[String] for each interval of 
course. I don't think it makes sense to say a DStream can be converted 
into one RDD since it is a stream. The past elements are inherently 
not supposed to stick around for a long time, and future elements 
aren't known. You may consider saving each RDD[String] to HDFS, and 
then simply loading it from HDFS as an RDD[String]. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253p22175.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Should Spark SQL support retrieve column value from Row by column name?

2015-03-22 Thread Yanbo Liang
If you use the latest version Spark 1.3, you can use the DataFrame API like

val results = sqlContext.sql("SELECT name FROM people")
results.select("name").show()

2015-03-22 15:40 GMT+08:00 amghost :

> I would like to retrieve column value from Spark SQL query result. But
> currently it seems that Spark SQL only support retrieving by index
>
>
> val results = sqlContext.sql("SELECT name FROM people")
> results.map(t => "Name: " + *t(0)*).collect().foreach(println)
>
> I think it will be much more convenient if I could do something like this:
>
> results.map(t => "Name: " + *t("name")*).collect().foreach(println)
>
> How do you like?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Should-Spark-SQL-support-retrieve-column-value-from-Row-by-column-name-tp22174.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: 'nested' RDD problem, advise needed

2015-03-22 Thread Victor Tso-Guillen
Something like this?

(2 to alphabetLength toList).map(shift => Object.myFunction(inputRDD,
shift).map(v => shift -> v).foldLeft(Object.myFunction(inputRDD, 1).map(v
=> 1 -> v))(_ union _)

which is an RDD[(Int, Char)]

Problem is that you can't play with RDDs inside of RDDs. The recursive
structure breaks the Spark programming model.

On Sat, Mar 21, 2015 at 10:26 AM, Michael Lewis  wrote:

> Hi,
>
> I wonder if someone can help suggest a solution to my problem, I had a
> simple process working using Strings and now
> want to convert to RDD[Char], the problem is when I end up with a nested
> call as follow:
>
>
> 1) Load a text file into an RDD[Char]
>
> val inputRDD = sc.textFile(“myFile.txt”).flatMap(_.toIterator)
>
>
> 2) I have a method that takes two parameters:
>
> object Foo
> {
> def myFunction(inputRDD: RDD[Char], int val) : RDD[Char]
> ...
>
>
> 3) I have a method that the driver process calls once its loaded the
> inputRDD ‘bar’ as follows:
>
> def bar(inputRDD: Rdd[Char) : Int = {
>
>  val solutionSet = sc.parallelize(1 to alphabetLength
> toList).map(shift => (shift, Object.myFunction(inputRDD,shift)))
>
>
>
> What I’m trying to do is take a list 1..26 and generate a set of tuples {
> (1,RDD(1)), …. (26,RDD(26)) }  which is the inputRDD passed through
> the function above, but with different set of shift parameters.
>
> In my original I could parallelise the algorithm fine, but my input string
> had to be in a ‘String’ variable, I’d rather it be an RDD
> (string could be large). I think the way I’m trying to do it above won’t
> work because its a nested RDD call.
>
> Can anybody suggest a solution?
>
> Regards,
> Mike Lewis
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Should Spark SQL support retrieve column value from Row by column name?

2015-03-22 Thread amghost
I would like to retrieve column value from Spark SQL query result. But
currently it seems that Spark SQL only support retrieving by index


val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name: " + *t(0)*).collect().foreach(println)

I think it will be much more convenient if I could do something like this:

results.map(t => "Name: " + *t("name")*).collect().foreach(println)

How do you like?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Should-Spark-SQL-support-retrieve-column-value-from-Row-by-column-name-tp22174.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrame saveAsTable - partitioned tables

2015-03-22 Thread deenar.toraskar
Hi

I wanted to store DataFrames as partitioned Hive tables. Is there a way to
do this via the saveAsTable call. The set of options does not seem to be
documented.

def
saveAsTable(tableName: String, source: String, mode: SaveMode, options:
Map[String, String]): Unit
(Scala-specific) Creates a table from the the contents of this DataFrame
based on a given data source, SaveMode specified by mode, and a set of
options.

Optionally is there a way to just create external hive tables for data that
is already present on HDFS. something similar to 

sc.sql("alter table results add partition (date = '2014');")

Regards
Deenar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-saveAsTable-partitioned-tables-tp22173.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org