What is a taskBinary for a ShuffleMapTask? What is its purpose?

2015-09-21 Thread Muler
Hi,

What is the purpose of the taskBinary for a ShuffleMapTask? What does it
contain and how is it useful? Is it the representation of all the RDD
operations that will be applied for the partition that task will be
processing? (in the case below the task will process stage 0, partition 0)
If it is not a representation of the RDD operations inside the stage, then
how does a task know the operations that it should apply on its partition?

Thanks,

*{ShuffleMapTask@9034} "ShuffleMapTask(0, 0)"*
 *taskBinary* = {TorrentBroadcast@8204} "Broadcast(1)"
  org$apache$spark$broadcast$TorrentBroadcast$$evidence$1 =
{ClassTag$$anon$1@8470} "Array[byte]"
  org$apache$spark$broadcast$TorrentBroadcast$$broadcastId =
{BroadcastBlockId@8249} "broadcast_1"
  numBlocks = 1
  _value = null
  org$apache$spark$broadcast$TorrentBroadcast$$compressionCodec = {Some@8468}
"Some(org.apache.spark.io.SnappyCompressionCodec@7ede98e1)"
  blockSize = 4194304
  bitmap$trans$0 = false
  id = 1
  org$apache$spark$broadcast$Broadcast$$_destroySite = {String@5327} ""
  _isValid = true
  org$apache$spark$Logging$$log_ = null
* partition* = {HadoopPartition@9049}
* locs* = {$colon$colon@9050} "::" size = 1
* preferredLocs* = {ArrayBuffer@9051} "ArrayBuffer" size = 1
* org$apache$spark$Logging$$log_* = null
* stageId* = 0
* partitionId* = 0
* taskMemoryManager* = null
* epoch* = -1
* metrics* = {None$@5261} "None"
* _executorDeserializeTime* = 0
* context* = null
* taskThread* = null
* _killed* = false

etc..
etc..


Re: question building spark in a virtual machine

2015-09-21 Thread Eyal Altshuler
Anyone?

On Sun, Sep 20, 2015 at 7:49 AM, Eyal Altshuler 
wrote:

> I allocated almost 6GB of RAM to the ubuntu virtual machine and got the
> same problem.
> I will go over this post and try to zoom in into the java vm settings.
>
> meanwhile - can someone with a working ubuntu machine can specify her JVM
> settings?
>
> Thanks,
> Eyal
>
> On Sat, Sep 19, 2015 at 7:49 PM, Ted Yu  wrote:
>
>> Please read this article:
>>
>> http://blogs.vmware.com/apps/2011/06/taking-a-closer-look-at-sizing-the-java-process.html
>>
>> Can you increase the memory given to the ubuntu virtual machine ?
>>
>> Cheers
>>
>> On Sat, Sep 19, 2015 at 9:30 AM, Eyal Altshuler > > wrote:
>>
>>> Hi,
>>>
>>> I allocate 4GB for the ubuntu virtual machine, how to check what is the
>>> maximal available for a jvm process?
>>> Regarding the thread - I see it's related to building on windows.
>>>
>>> Thanks,
>>> Eyal
>>>
>>> On Sat, Sep 19, 2015 at 6:54 PM, Ted Yu  wrote:
>>>
 See also this thread:

 https://bukkit.org/threads/complex-craftbukkit-server-and-java-problem-could-not-reserve-enough-space-for-object-heap.155192/

 Cheers

 On Sat, Sep 19, 2015 at 8:51 AM, Aniket Bhatnagar <
 aniket.bhatna...@gmail.com> wrote:

> Hi Eval
>
> Can you check if your Ubuntu VM has enough RAM allocated to run JVM of
> size 3gb?
>
> thanks,
> Aniket
>
> On Sat, Sep 19, 2015, 9:09 PM Eyal Altshuler 
> wrote:
>
>> Hi,
>>
>> I had configured the MAVEN_OPTS environment variable the same as you
>> wrote.
>> My java version is 1.7.0_75.
>> I didn't customized the JVM heap size specifically. Is there an
>> additional configuration I have to run besides the MAVEN_OPTS 
>> configutaion?
>>
>> Thanks,
>> Eyal
>>
>> On Sat, Sep 19, 2015 at 5:29 PM, Ted Yu  wrote:
>>
>>> Can you tell us how you configured the JVM heap size ?
>>> Which version of Java are you using ?
>>>
>>> When I build Spark, I do the following:
>>>
>>> export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M
>>> -XX:ReservedCodeCacheSize=512m"
>>>
>>> Cheers
>>>
>>> On Sat, Sep 19, 2015 at 5:31 AM, Eyal Altshuler <
>>> eyal.altshu...@gmail.com> wrote:
>>>
 Hi,
 Trying to build spark in my ubuntu virtual machine, I am getting
 the following error:

 "Error occurred during initialization of VM
 Could not reserve enough space for object heap
 Error: could not create the Java Virtual Machine.
 Error: A fatal exception has occurred. Program will exit".

 I have configured the JVM heap size correctly.

 How can I fix it?

 Thanks,
 Eyal

>>>
>>>
>>

>>>
>>
>


Re: Class cast exception : Spark 1.5

2015-09-21 Thread sim
You likely need to add the Cassandra connector JAR to spark.jars so it is
available to the executors.

Hope this helps,
Sim



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Class-cast-exception-Spark-1-5-tp24732p24753.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: word count (group by users) in spark

2015-09-21 Thread Aniket Bhatnagar
Unless I am mistaken, in a group by operation, it spills to disk in case
values for a key don't fit in memory.

Thanks,
Aniket

On Mon, Sep 21, 2015 at 10:43 AM Huy Banh  wrote:

> Hi,
>
> If your input format is user -> comment, then you could:
>
> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
> four three")))
> val wordCounts = comments.
>flatMap({case (user, comment) =>
> for (word <- comment.split(" ")) yield(((user, word), 1)) }).
>reduceByKey(_ + _)
>
> val output = wordCounts.
>map({case ((user, word), count) => (user, (word, count))}).
>groupByKey()
>
> By Aniket, if we group by user first, it could run out of memory when
> spark tries to put all words in a single sequence, couldn't it?
>
> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Using scala API, you can first group by user and then use combineByKey.
>>
>> Thanks,
>> Aniket
>>
>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi All,
>>> I would like to achieve this below output using spark , I managed to
>>> write
>>> in Hive and call it in spark but not in just spark (scala), how to group
>>> word counts on particular user (column) for example.
>>> Imagine users and their given tweets I want to do word count based on
>>> user
>>> name.
>>>
>>> Input:-
>>> kaliA,B,A,B,B
>>> james B,A,A,A,B
>>>
>>> Output:-
>>> kali A [Count] B [Count]
>>> James A [Count] B [Count]
>>>
>>> My Hive Answer:-
>>> CREATE EXTERNAL TABLE  TEST
>>> (
>>>  user_name string   ,
>>>  COMMENTS  STRING
>>>
>>> )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
>>> LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
>>> create a text file with data mentioned in the email)
>>>
>>> use default;select user_name,COLLECT_SET(text) from (select
>>> user_name,concat(sub,' ',count(comments)) as text  from test LATERAL VIEW
>>> explode(split(comments,',')) subView AS sub group by user_name,sub)w
>>> group
>>> by user_name;
>>>
>>> Spark With Hive:-
>>> package com.examples
>>>
>>> /**
>>>  * Created by kalit_000 on 17/09/2015.
>>>  */
>>> import org.apache.log4j.Logger
>>> import org.apache.log4j.Level
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.{SparkContext, SparkConf}
>>> import org.apache.spark.SparkContext._
>>>
>>>
>>> object HiveWordCount {
>>>
>>>   def main(args: Array[String]): Unit =
>>>   {
>>> Logger.getLogger("org").setLevel(Level.WARN)
>>> Logger.getLogger("akka").setLevel(Level.WARN)
>>>
>>> val conf = new
>>>
>>> SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
>>> "1g")
>>> val sc = new SparkContext(conf)
>>> val sqlContext= new SQLContext(sc)
>>>
>>> val hc=new HiveContext(sc)
>>>
>>> hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
>>> string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
>>> STORED AS TEXTFILE LOCATION '/data/kali/test' ")
>>>
>>> val op=hc.sql("select user_name,COLLECT_SET(text) from (select
>>> user_name,concat(sub,' ',count(comments)) as text  from default.test
>>> LATERAL
>>> VIEW explode(split(comments,',')) subView AS sub group by user_name,sub)w
>>> group by user_name")
>>>
>>> op.collect.foreach(println)
>>>
>>>
>>>   }
>>>
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>


Re: Problem at sbt/sbt assembly

2015-09-21 Thread Sean Owen
Sbt asked for a bigger initial heap than the host had space for. It is a
JVM error you can and should search for first. You will need more memory.

On Mon, Sep 21, 2015, 2:11 AM Aaroncq4 <475715...@qq.com> wrote:

> When I used “sbt/sbt assembly" to compile spark code of spark-1.5.0,I got a
> problem and I did not know why.It signs that:
>
> NOTE: The sbt/sbt script has been relocated to build/sbt.
>   Please update references to point to the new location.
>
>   Invoking 'build/sbt assembly' now ...
>
> Using /usr/lib/jvm/java-7-oracle as default JAVA_HOME.
> Note, this will be overridden by -java-home if it is set.
> Error occurred during initialization of VM
> Could not reserve enough space for object heap
> Error: Could not create the Java Virtual Machine.
> Error: A fatal exception has occurred. Program will exit.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-at-sbt-sbt-assembly-tp24752.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Fwd: Issue with high no of skipped task

2015-09-21 Thread Saurav Sinha
Hi Users,

I am new Spark I have written flow.When we deployed our code it is
completing jobs in 4-5 min. But now it is taking 20+ min in completing with
almost same set of data. Can you please help me to figure out reason for it.

-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062



-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Deploying spark-streaming application on production

2015-09-21 Thread Jeetendra Gangele
Hi All,

I have an spark streaming application with batch (10 ms) which is reading
the MQTT channel and dumping the data from MQTT to HDFS.

So suppose if I have to deploy new application jar(with changes in spark
streaming application) what is the best way to deploy, currently I am doing
as below

1.killing the running streaming app using yarn application -kill ID
2. and then starting the application again

Problem with above approach is since we are not persisting the events in
MQTT we will miss the events for the period of deploy.

how to handle this case?

regards
jeeetndra


Spark Lost executor && shuffle.FetchFailedException

2015-09-21 Thread biyan900116
Hi All:

When I write the data to the hive dynamic partition table, many errors and 
warnings as following happen... 

Is the reason that shuffle output is so large ?

=
15/09/21 14:53:09 ERROR cluster.YarnClusterScheduler: Lost executor 402 on 
dn03.datanode.com: remote Rpc client disassociated

=
15/09/21 14:53:27 WARN scheduler.TaskSetManager: Lost task 107.0 in stage 7.0 
(TID 27601, dn01.datanode.com): FetchFailed(BlockManagerId(513, 
dn02.datanode.com, 34869), shuffleId=1, mapId=90, reduceId=107, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
dn02.datanode.com/XX.XX.XX.17:34869
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:216)
at 
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:61)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.io.IOException: Failed to connect to 
dn02.datanode.com/XX.XX.XX.17:34869
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
... 3 more
Caused by: java.net.ConnectException: 拒绝连接: dn02.datanode.com/XX.XX.XX.17:34869
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:708)
at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more

)



Issue with high no of skipped task

2015-09-21 Thread Saurav Sinha
Hi Users,

I am new Spark I have written flow.When we deployed our code it is
completing jobs in 4-5 min. But now it is taking 20+ min in completing with
almost same set of data. Can you please help me to figure out reason for it.

-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Hbase Spark streaming issue.

2015-09-21 Thread Siva
Hi,

I m seeing some strange error while inserting data from spark streaming to
hbase.

I can able to write the data from spark (without streaming) to hbase
successfully, but when i use the same code to write dstream I m seeing the
below error.

I tried setting the below parameters, still didnt help. Did any face the
similar issue?

conf.set("hbase.defaults.for.version.skip", "true")
conf.set("hbase.defaults.for.version", "0.98.4.2.2.4.2-2-hadoop2")

15/09/20 22:39:10 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
16)
java.lang.RuntimeException: hbase-default.xml file seems to be for and old
version of HBase (null), this version is 0.98.4.2.2.4.2-2-hadoop2
at
org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73)
at
org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105)
at
org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116)
at
org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:125)
at
$line51.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$HBaseConn$.hbaseConnection(:49)
at
$line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
at
$line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/20 22:39:10 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID 16,
localhost): java.lang.RuntimeException: hbase-default.xml file seems to be
for and old version of HBase (null), this version is
0.98.4.2.2.4.2-2-hadoop2


Thanks,
Siva.


Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
Hmm, I don't think that's what I want. There's no "zero value" in my use
case.

On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen  wrote:

> I think foldByKey is much more what you want, as it has more a notion
> of building up some result per key by encountering values serially.
> You would take the first and ignore the rest. Note that "first"
> depends on your RDD having an ordering to begin with, or else you rely
> on however it happens to be ordered after whatever operations give you
> a key-value RDD.
>
> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver 
> wrote:
> > I am processing a single file and want to remove duplicate rows by some
> key
> > by always choosing the first row in the file for that key.
> >
> > The best solution I could come up with is to zip each row with the
> partition
> > index and local index, like this:
> >
> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> > ((partitionIndex, localIndex), row)) }
> > }
> >
> >
> > And then using reduceByKey with a min ordering on the (partitionIndex,
> > localIndex) pair.
> >
> > First, can i count on SparkContext.textFile to read the lines in such
> that
> > the partition indexes are always increasing so that the above works?
> >
> > And, is there a better way to accomplish the same effect?
> >
> > Thanks!
> >
> > - Philip
> >
>


Spark Ingestion into Relational DB

2015-09-21 Thread Sri
Hi,

We have a usecase  where we get the dated from different systems and finally
data will be consolidated into Oracle Database. Does spark is a valid
useless for this scenario. Currently we also don't have any big data
component. In case if we go with Spark to ingest data, does it require
hadoop.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Sean Owen
The zero value here is None. Combining None with any row should yield
Some(row). After that, combining is a no-op for other rows.

On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver  wrote:
> Hmm, I don't think that's what I want. There's no "zero value" in my use
> case.
>
> On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen  wrote:
>>
>> I think foldByKey is much more what you want, as it has more a notion
>> of building up some result per key by encountering values serially.
>> You would take the first and ignore the rest. Note that "first"
>> depends on your RDD having an ordering to begin with, or else you rely
>> on however it happens to be ordered after whatever operations give you
>> a key-value RDD.
>>
>> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver 
>> wrote:
>> > I am processing a single file and want to remove duplicate rows by some
>> > key
>> > by always choosing the first row in the file for that key.
>> >
>> > The best solution I could come up with is to zip each row with the
>> > partition
>> > index and local index, like this:
>> >
>> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> > ((partitionIndex, localIndex), row)) }
>> > }
>> >
>> >
>> > And then using reduceByKey with a min ordering on the (partitionIndex,
>> > localIndex) pair.
>> >
>> > First, can i count on SparkContext.textFile to read the lines in such
>> > that
>> > the partition indexes are always increasing so that the above works?
>> >
>> > And, is there a better way to accomplish the same effect?
>> >
>> > Thanks!
>> >
>> > - Philip
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Sean Owen
Yes, that's right, though "in order" depends on the RDD having an
ordering, but so does the zip-based solution.

Actually, I'm going to walk that back a bit, since I don't see a
guarantee that foldByKey behaves like foldLeft. The implementation
underneath, in combineByKey, appears that it will act this way in
practice though.

On Tue, Sep 22, 2015 at 4:45 AM, Philip Weaver  wrote:
> Hmm, ok, but I'm not seeing why foldByKey is more appropriate than
> reduceByKey? Specifically, is foldByKey guaranteed to walk the RDD in order,
> but reduceByKey is not?
>
> On Mon, Sep 21, 2015 at 8:41 PM, Sean Owen  wrote:
>>
>> The zero value here is None. Combining None with any row should yield
>> Some(row). After that, combining is a no-op for other rows.
>>
>> On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver 
>> wrote:
>> > Hmm, I don't think that's what I want. There's no "zero value" in my use
>> > case.
>> >
>> > On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen  wrote:
>> >>
>> >> I think foldByKey is much more what you want, as it has more a notion
>> >> of building up some result per key by encountering values serially.
>> >> You would take the first and ignore the rest. Note that "first"
>> >> depends on your RDD having an ordering to begin with, or else you rely
>> >> on however it happens to be ordered after whatever operations give you
>> >> a key-value RDD.
>> >>
>> >> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver
>> >> 
>> >> wrote:
>> >> > I am processing a single file and want to remove duplicate rows by
>> >> > some
>> >> > key
>> >> > by always choosing the first row in the file for that key.
>> >> >
>> >> > The best solution I could come up with is to zip each row with the
>> >> > partition
>> >> > index and local index, like this:
>> >> >
>> >> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>> >> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> >> > ((partitionIndex, localIndex), row)) }
>> >> > }
>> >> >
>> >> >
>> >> > And then using reduceByKey with a min ordering on the
>> >> > (partitionIndex,
>> >> > localIndex) pair.
>> >> >
>> >> > First, can i count on SparkContext.textFile to read the lines in such
>> >> > that
>> >> > the partition indexes are always increasing so that the above works?
>> >> >
>> >> > And, is there a better way to accomplish the same effect?
>> >> >
>> >> > Thanks!
>> >> >
>> >> > - Philip
>> >> >
>> >
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
Hmm, ok, but I'm not seeing why foldByKey is more appropriate than
reduceByKey? Specifically, is foldByKey guaranteed to walk the RDD in
order, but reduceByKey is not?

On Mon, Sep 21, 2015 at 8:41 PM, Sean Owen  wrote:

> The zero value here is None. Combining None with any row should yield
> Some(row). After that, combining is a no-op for other rows.
>
> On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver 
> wrote:
> > Hmm, I don't think that's what I want. There's no "zero value" in my use
> > case.
> >
> > On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen  wrote:
> >>
> >> I think foldByKey is much more what you want, as it has more a notion
> >> of building up some result per key by encountering values serially.
> >> You would take the first and ignore the rest. Note that "first"
> >> depends on your RDD having an ordering to begin with, or else you rely
> >> on however it happens to be ordered after whatever operations give you
> >> a key-value RDD.
> >>
> >> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver  >
> >> wrote:
> >> > I am processing a single file and want to remove duplicate rows by
> some
> >> > key
> >> > by always choosing the first row in the file for that key.
> >> >
> >> > The best solution I could come up with is to zip each row with the
> >> > partition
> >> > index and local index, like this:
> >> >
> >> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> >> > ((partitionIndex, localIndex), row)) }
> >> > }
> >> >
> >> >
> >> > And then using reduceByKey with a min ordering on the (partitionIndex,
> >> > localIndex) pair.
> >> >
> >> > First, can i count on SparkContext.textFile to read the lines in such
> >> > that
> >> > the partition indexes are always increasing so that the above works?
> >> >
> >> > And, is there a better way to accomplish the same effect?
> >> >
> >> > Thanks!
> >> >
> >> > - Philip
> >> >
> >
> >
>


Re: How does one use s3 for checkpointing?

2015-09-21 Thread Utkarsh Sengar
We are using "spark-1.4.1-bin-hadoop2.4" on mesos (not EMR) with s3 to read
and write data and haven't noticed any inconsistencies with it, so 1
(mostly) and 2 definitely should not be a problem.
Regarding 3, are you setting the file system impl in spark config?

sparkContext.hadoopConfiguration().set("fs.s3.impl",
"org.apache.hadoop.fs.s3native.NativeS3FileSystem");

And I have these dependencies if that helps.


org.apache.spark
spark-core_2.10
1.4.1


org.apache.hadoop
hadoop-mapreduce-client-core
2.4.1


-Utkarsh

On Mon, Sep 21, 2015 at 7:13 PM, Jerry Lam  wrote:

> Hi Amit,
>
> Have you looked at Amazon EMR? Most people using EMR use s3 for
> persistency (both as input and output of spark jobs).
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 21 Sep, 2015, at 9:24 pm, Amit Ramesh  wrote:
>
>
> A lot of places in the documentation mention using s3 for checkpointing,
> however I haven't found any examples or concrete evidence of anyone having
> done this.
>
>1. Is this a safe/reliable option given the read-after-write
>consistency for PUTS in s3?
>2. Is s3 access broken for hadoop 2.6 (SPARK-7442
>)? If so, is it
>viable in 2.4?
>3. Related to #2. I did try providing hadoop-aws-2.6.0.jar while
>submitting the job and got the following stack trace. Is there a fix?
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> None.org.apache.spark.api.java.JavaSparkContext.
> : java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem:
> Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at
> java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at
> org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
> at
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
> at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
> at
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.SparkContext.(SparkContext.scala:475)
> at
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NoClassDefFoundError:
> com/amazonaws/AmazonServiceException
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
> at java.lang.Class.getConstructor0(Class.java:2885)
> at java.lang.Class.newInstance(Class.java:350)
> at
> java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 27 more
> Caused by: java.lang.ClassNotFoundException:
> com.amazonaws.AmazonServiceException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at 

Invalid checkpoint url

2015-09-21 Thread srungarapu vamsi
I am using reduceByKeyAndWindow (with inverse reduce function) in my code.
In order to use this, it seems the checkpointDirectory which i have to use
should be hadoop compatible file system.
Does that mean that, i should setup hadoop on my system.
I googled about this and i found in a S.O answer that i need not setup hdfs
but the checkpoint directory should be HDFS copatible.

I am a beginner in this area. I am running my spark streaming application
on ubuntu 14.04, spark -1.3.1
If at all i need not setup hdfs and ext4 is hdfs compatible, then how does
my checkpoint directory look like?

i tried all these:
ssc.checkpoint("/tmp/checkpoint")
ssc.checkpoint("hdfs:///tmp/checkpoint")
ssc.checkpoint("file:///tmp/checkpoint")

But none of them worked for me.

-- 
/Vamsi


Re: Spark Ingestion into Relational DB

2015-09-21 Thread ayan guha
No, it does not require hadoop.


   1. However, I doubt if this is a good usecase for spark. You probably
   would be better off and gain better performance with sqlloader.


On Tue, Sep 22, 2015 at 3:13 PM, Sri  wrote:

> Hi,
>
> We have a usecase  where we get the dated from different systems and
> finally
> data will be consolidated into Oracle Database. Does spark is a valid
> useless for this scenario. Currently we also don't have any big data
> component. In case if we go with Spark to ingest data, does it require
> hadoop.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Ingestion into Relational DB

2015-09-21 Thread Jörn Franke
You do not need Hadoop. However, you should think about using it. If you
use Spark to load data directly from Oracle then your database might have
unexpected loads of data once a Spark node may fail. Additionally, the
Oracle Database, if it is not based on local disk, may have a storage
bottleneck. Furthermore, Spark standalone has no resource management
mechanism for supporting different slas, you may need yarn (hadoop) for
that. Finally, using the Oracle Database for storing all the data may be an
expensive exercise. What I have seen often is that hadoop is used for
storing all the data and managing the resources. Spark can be used for
machine learning over this data and the Oracle Database (or any relational
datastore, Nosql database, in-memory db) is used to serve the data to a lot
of users. This is also the basic idea behind the lambda architecture.

Le mar. 22 sept. 2015 à 7:13, Sri  a écrit :

> Hi,
>
> We have a usecase  where we get the dated from different systems and
> finally
> data will be consolidated into Oracle Database. Does spark is a valid
> useless for this scenario. Currently we also don't have any big data
> component. In case if we go with Spark to ingest data, does it require
> hadoop.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Ingestion-into-Relational-DB-tp24761.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-09-21 Thread tridib
Did you get any solution to this? I am getting same issue.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750p24759.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
I am processing a single file and want to remove duplicate rows by some key
by always choosing the first row in the file for that key.

The best solution I could come up with is to zip each row with the
partition index and local index, like this:

rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
  rows.zipWithIndex.map { case (row, localIndex) => (row.key,
((partitionIndex, localIndex), row)) }
}


And then using reduceByKey with a min ordering on the (partitionIndex,
localIndex) pair.

First, can i count on SparkContext.textFile to read the lines in such that
the partition indexes are always increasing so that the above works?

And, is there a better way to accomplish the same effect?

Thanks!

- Philip


Re: passing SparkContext as parameter

2015-09-21 Thread Priya Ch
can i use this sparkContext on executors ??
In my application, i have scenario of reading from db for certain records
in rdd. Hence I need sparkContext to read from DB (cassandra in our case),

If sparkContext couldn't be sent to executors , what is the workaround for
this ??

On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:

> add @transient?
>
> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
> wrote:
>
>> Hello All,
>>
>> How can i pass sparkContext as a parameter to a method in an object.
>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>
>> How can i achieve this ?
>>
>> Thanks,
>> Padma Ch
>>
>
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Romi Kuntsman
RDD is a set of data rows (in your case numbers), there is no meaning for
the order of the items.
What exactly are you trying to accomplish?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu 
wrote:

> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Priya Ch
Yes, but i need to read from cassandra db within a spark
transformation..something like..

dstream.forachRDD{

rdd=> rdd.foreach {
 message =>
 sc.cassandraTable()
  .
  .
  .
}
}

Since rdd.foreach gets executed on workers, how can i make sparkContext
available on workers ???

Regards,
Padma Ch

On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:

> You can use broadcast variable for passing connection information.
>
> Cheers
>
> On Sep 21, 2015, at 4:27 AM, Priya Ch 
> wrote:
>
> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records
> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>
> If sparkContext couldn't be sent to executors , what is the workaround for
> this ??
>
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>
>> add @transient?
>>
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
>> wrote:
>>
>>> Hello All,
>>>
>>> How can i pass sparkContext as a parameter to a method in an object.
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>>
>>> How can i achieve this ?
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
sparkConext is available on the driver, not on executors.

To read from Cassandra, you can use something like this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 2:27 PM, Priya Ch 
wrote:

> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records
> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>
> If sparkContext couldn't be sent to executors , what is the workaround for
> this ??
>
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>
>> add @transient?
>>
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
>> wrote:
>>
>>> Hello All,
>>>
>>> How can i pass sparkContext as a parameter to a method in an object.
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>>
>>> How can i achieve this ?
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Ted Yu
You can use broadcast variable for passing connection information. 

Cheers

> On Sep 21, 2015, at 4:27 AM, Priya Ch  wrote:
> 
> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records in 
> rdd. Hence I need sparkContext to read from DB (cassandra in our case),
> 
> If sparkContext couldn't be sent to executors , what is the workaround for 
> this ??
> 
>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>> add @transient?
>> 
>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch  
>>> wrote:
>>> Hello All,
>>> 
>>> How can i pass sparkContext as a parameter to a method in an object. 
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>> 
>>> How can i achieve this ?
>>> 
>>> Thanks,
>>> Padma Ch
> 


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
foreach is something that runs on the driver, not the workers.

if you want to perform some function on each record from cassandra, you
need to do cassandraRdd.map(func), which will run distributed on the spark
workers

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
wrote:

> Yes, but i need to read from cassandra db within a spark
> transformation..something like..
>
> dstream.forachRDD{
>
> rdd=> rdd.foreach {
>  message =>
>  sc.cassandraTable()
>   .
>   .
>   .
> }
> }
>
> Since rdd.foreach gets executed on workers, how can i make sparkContext
> available on workers ???
>
> Regards,
> Padma Ch
>
> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>
>> You can use broadcast variable for passing connection information.
>>
>> Cheers
>>
>> On Sep 21, 2015, at 4:27 AM, Priya Ch 
>> wrote:
>>
>> can i use this sparkContext on executors ??
>> In my application, i have scenario of reading from db for certain records
>> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>>
>> If sparkContext couldn't be sent to executors , what is the workaround
>> for this ??
>>
>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak  wrote:
>>
>>> add @transient?
>>>
>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch >> > wrote:
>>>
 Hello All,

 How can i pass sparkContext as a parameter to a method in an
 object. Because passing sparkContext is giving me TaskNotSerializable
 Exception.

 How can i achieve this ?

 Thanks,
 Padma Ch

>>>
>>>
>>
>


Re: Spark does not yet support its JDBC component for Scala 2.11.

2015-09-21 Thread Ted Yu
I think the document should be updated to reflect the integration of
SPARK-8013 

Cheers

On Mon, Sep 21, 2015 at 3:48 AM, Petr Novak  wrote:

> Nice, thanks.
>
> So the note in build instruction for 2.11 is obsolete? Or there are still
> some limitations?
>
>
> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
>
> On Fri, Sep 11, 2015 at 2:19 PM, Petr Novak  wrote:
>
>> Nice, thanks.
>>
>> So the note in build instruction for 2.11 is obsolete? Or there are still
>> some limitations?
>>
>>
>> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
>>
>> On Fri, Sep 11, 2015 at 2:09 PM, Ted Yu  wrote:
>>
>>> Have you looked at:
>>> https://issues.apache.org/jira/browse/SPARK-8013
>>>
>>>
>>>
>>> > On Sep 11, 2015, at 4:53 AM, Petr Novak  wrote:
>>> >
>>> > Does it still apply for 1.5.0?
>>> >
>>> > What actual limitation does it mean when I switch to 2.11? No JDBC
>>> Thriftserver? No JDBC DataSource? No JdbcRDD (which is already obsolete I
>>> believe)? Some more?
>>> >
>>> > What library is the blocker to upgrade JDBC component to 2.11?
>>> >
>>> > Is there any estimate when it could be available for 2.11?
>>> >
>>> > Many thanks,
>>> > Petr
>>>
>>
>>
>


Re: Spark does not yet support its JDBC component for Scala 2.11.

2015-09-21 Thread Petr Novak
Nice, thanks.

So the note in build instruction for 2.11 is obsolete? Or there are still
some limitations?

http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211

On Fri, Sep 11, 2015 at 2:19 PM, Petr Novak  wrote:

> Nice, thanks.
>
> So the note in build instruction for 2.11 is obsolete? Or there are still
> some limitations?
>
>
> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211
>
> On Fri, Sep 11, 2015 at 2:09 PM, Ted Yu  wrote:
>
>> Have you looked at:
>> https://issues.apache.org/jira/browse/SPARK-8013
>>
>>
>>
>> > On Sep 11, 2015, at 4:53 AM, Petr Novak  wrote:
>> >
>> > Does it still apply for 1.5.0?
>> >
>> > What actual limitation does it mean when I switch to 2.11? No JDBC
>> Thriftserver? No JDBC DataSource? No JdbcRDD (which is already obsolete I
>> believe)? Some more?
>> >
>> > What library is the blocker to upgrade JDBC component to 2.11?
>> >
>> > Is there any estimate when it could be available for 2.11?
>> >
>> > Many thanks,
>> > Petr
>>
>
>


How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John


Re: DataGenerator for streaming application

2015-09-21 Thread Hemant Bhanawat
Why are you using  rawSocketStream to read the data? I believe
rawSocketStream waits for a big chunk of data before it can start
processing it. I think what you are writing is a String and you should use
socketTextStream which reads the data on a per line basis.

On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa  wrote:

> Hi,
>
> I am trying to build a data generator that feeds a streaming application.
> This data generator just reads a file and send its lines through a socket.
> I get no errors on the logs, and the benchmark bellow always prints
> "Received 0 records". Am I doing something wrong?
>
>
> object MyDataGenerator {
>
>   def main(args: Array[String]) {
> if (args.length != 3) {
>   System.err.println("Usage: RawTextSender   ")
>   System.exit(1)
> }
> // Parse the arguments using a pattern match
> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>
> val serverSocket = new ServerSocket(port)
> println("Listening on port " + port)
>
>
> while (true) {
>   val socket = serverSocket.accept()
>   println("Got a new connection")
>
>
>   val out = new PrintWriter(socket.getOutputStream)
>   try {
> var count = 0
> var startTimestamp = -1
> for (line <- Source.fromFile(file).getLines()) {
>   val ts = line.substring(2, line.indexOf(',',2)).toInt
>   if(startTimestamp < 0)
> startTimestamp = ts
>
>   if(ts - startTimestamp <= 30) {
> out.println(line)
> count += 1
>   } else {
> println(s"Emmited reports: $count")
> count = 0
> out.flush()
> startTimestamp = ts
> Thread.sleep(sleepMillis)
>   }
> }
>   } catch {
> case e: IOException =>
>   println("Client disconnected")
>   socket.close()
>   }
> }
> }
> }
>
>
>
> object Benchmark {
>   def main(args: Array[String]) {
> if (args.length != 4) {
>   System.err.println("Usage: RawNetworkGrep
> ")
>   System.exit(1)
> }
>
> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
> args(2).toInt, args(3).toInt)
> val sparkConf = new SparkConf()
> sparkConf.setAppName("BenchMark")
> 
> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
> sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops 
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 
> -XX:MaxInlineSize=300 ")
> if (sparkConf.getOption("spark.master") == None) {
>   // Master not set, as this was not launched through Spark-submit. 
> Setting master as local."
>   sparkConf.setMaster("local[*]")
> }
>
> // Create the context
> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>
> val rawStreams = (1 to numStreams).map(_ =>
>   ssc.rawSocketStream[String](host, port, 
> StorageLevel.MEMORY_ONLY_SER)).toArray
> val union = ssc.union(rawStreams)
> union.count().map(c => s"Received $c records").print()
> ssc.start()
> ssc.awaitTermination()
>   }
> }
>
> Thanks.
>
>


Re: HiveQL Compatibility (0.12.0, 0.13.0???)

2015-09-21 Thread Michael Armbrust
In general we welcome pull requests for these kind of updates.  In this
case its already been fixed in master and branch-1.5 and will be updated
when we release 1.5.1 (hopefully soon).

On Mon, Sep 21, 2015 at 1:21 PM, Dominic Ricard <
dominic.ric...@tritondigital.com> wrote:

> Hi,
>here's a statement from the Spark 1.5.0  Spark SQL and DataFrame Guide
> <
> https://spark.apache.org/docs/latest/sql-programming-guide.html#compatibility-with-apache-hive
> >
> :
>
> *Compatibility with Apache Hive*
> Spark SQL is designed to be compatible with the Hive Metastore, SerDes and
> UDFs. Currently Spark SQL is based on Hive 0.12.0 and 0.13.1.
>
> After testing many functions available in 1.1.0 and 1.2.0, I tend to think
> that this is no longer true...
>
> Could someone update the documentation or tell me what these versions refer
> to as it appears that Spark SQL 1.5.0 support everything in Hive 1.2.0...
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HiveQL-Compatibility-0-12-0-0-13-0-tp24757.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark on Yarn vs Standalone

2015-09-21 Thread Saisai Shao
I think you need to increase the memory size of executor through command
arguments "--executor-memory", or configuration "spark.executor.memory".

Also yarn.scheduler.maximum-allocation-mb in Yarn side if necessary.

Thanks
Saisai


On Mon, Sep 21, 2015 at 5:13 PM, Alexander Pivovarov 
wrote:

> I noticed that some executors have issue with scratch space.
> I see the following in yarn app container stderr around the time when yarn
> killed the executor because it uses too much memory.
>
> -- App container stderr --
> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
> rdd_6_346 in memory! (computed 3.0 GB so far)
> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB (blocks)
> + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB. Storage
> limit = 25.2 GB.
> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_346
> to disk instead.
> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
> rdd_6_49 in memory! (computed 3.1 GB so far)
> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB (blocks)
> + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB. Storage
> limit = 25.2 GB.
> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_49
> to disk instead.
>
> -- Yarn Nodemanager log --
> 2015-09-21 21:44:05,716 WARN
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
> (Container Monitor): Container
> [pid=5114,containerID=container_1442869100946_0001_01_0
> 00056] is running beyond physical memory limits. Current usage: 52.2 GB of
> 52 GB physical memory used; 53.0 GB of 260 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_1442869100946_0001_01_56 :
> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> |- 5117 5114 5114 5114 (java) 1322810 27563 56916316160 13682772
> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError=kill %p
> -Xms47924m -Xmx47924m -verbose:gc -XX:+PrintGCDetails
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
> -XX:+CMSClassUnloadingEnabled
> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
> -Dspark.akka.failure-detector.threshold=3000.0
> -Dspark.akka.heartbeat.interval=1s -Dspark.akka.threads=4
> -Dspark.history.ui.port=18080 -Dspark.akka.heartbeat.pauses=6s
> -Dspark.akka.timeout=1000s -Dspark.akka.frameSize=50
> -Dspark.driver.port=52690
> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
> application_1442869100946_0001 --user-class-path
> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
> |- 5114 5112 5114 5114 (bash) 0 0 9658368 291 /bin/bash -c
> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError='kill %p'
> -Xms47924m -Xmx47924m '-verbose:gc' '-XX:+PrintGCDetails'
> '-XX:+PrintGCDateStamps' '-XX:+UseConcMarkSweepGC'
> '-XX:CMSInitiatingOccupancyFraction=70' '-XX:MaxHeapFreeRatio=70'
> '-XX:+CMSClassUnloadingEnabled'
> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
> '-Dspark.akka.failure-detector.threshold=3000.0'
> '-Dspark.akka.heartbeat.interval=1s' '-Dspark.akka.threads=4'
> '-Dspark.history.ui.port=18080' '-Dspark.akka.heartbeat.pauses=6s'
> '-Dspark.akka.timeout=1000s' '-Dspark.akka.frameSize=50'
> '-Dspark.driver.port=52690'
> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
> application_1442869100946_0001 --user-class-path
> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
> 1>
> /var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stdout
> 2>
> /var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stderr
>
>
>
> Is it possible to get what scratch space is used for?
>
> What spark setting should I try to adjust to solve the issue?
>
> On Thu, Sep 10, 2015 at 2:52 PM, Sandy Ryza 
> wrote:
>
>> YARN will never kill processes for 

Re: word count (group by users) in spark

2015-09-21 Thread Zhang, Jingyu
Spark spills data to disk when there is more data shuffled onto a single
executor machine than can fit in memory. However, it flushes out the data
to disk one key at a time - so if a single key has more key-value pairs
than can fit in memory, an out of memory exception occurs.

Cheers,

Jingyu

On 21 September 2015 at 16:39, Aniket Bhatnagar 
wrote:

> Unless I am mistaken, in a group by operation, it spills to disk in case
> values for a key don't fit in memory.
>
> Thanks,
> Aniket
>
> On Mon, Sep 21, 2015 at 10:43 AM Huy Banh  wrote:
>
>> Hi,
>>
>> If your input format is user -> comment, then you could:
>>
>> val comments = sc.parallelize(List(("u1", "one two one"), ("u2", "three
>> four three")))
>> val wordCounts = comments.
>>flatMap({case (user, comment) =>
>> for (word <- comment.split(" ")) yield(((user, word), 1)) }).
>>reduceByKey(_ + _)
>>
>> val output = wordCounts.
>>map({case ((user, word), count) => (user, (word, count))}).
>>groupByKey()
>>
>> By Aniket, if we group by user first, it could run out of memory when
>> spark tries to put all words in a single sequence, couldn't it?
>>
>> On Sat, Sep 19, 2015 at 11:05 PM Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Using scala API, you can first group by user and then use combineByKey.
>>>
>>> Thanks,
>>> Aniket
>>>
>>> On Sat, Sep 19, 2015, 6:41 PM kali.tumm...@gmail.com <
>>> kali.tumm...@gmail.com> wrote:
>>>
 Hi All,
 I would like to achieve this below output using spark , I managed to
 write
 in Hive and call it in spark but not in just spark (scala), how to group
 word counts on particular user (column) for example.
 Imagine users and their given tweets I want to do word count based on
 user
 name.

 Input:-
 kaliA,B,A,B,B
 james B,A,A,A,B

 Output:-
 kali A [Count] B [Count]
 James A [Count] B [Count]

 My Hive Answer:-
 CREATE EXTERNAL TABLE  TEST
 (
  user_name string   ,
  COMMENTS  STRING

 )  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001'  STORED AS TEXTFILE
 LOCATION '/data/kali/test';    HDFS FOLDER (create hdfs folder and
 create a text file with data mentioned in the email)

 use default;select user_name,COLLECT_SET(text) from (select
 user_name,concat(sub,' ',count(comments)) as text  from test LATERAL
 VIEW
 explode(split(comments,',')) subView AS sub group by user_name,sub)w
 group
 by user_name;

 Spark With Hive:-
 package com.examples

 /**
  * Created by kalit_000 on 17/09/2015.
  */
 import org.apache.log4j.Logger
 import org.apache.log4j.Level
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._


 object HiveWordCount {

   def main(args: Array[String]): Unit =
   {
 Logger.getLogger("org").setLevel(Level.WARN)
 Logger.getLogger("akka").setLevel(Level.WARN)

 val conf = new

 SparkConf().setMaster("local").setAppName("HiveWordCount").set("spark.executor.memory",
 "1g")
 val sc = new SparkContext(conf)
 val sqlContext= new SQLContext(sc)

 val hc=new HiveContext(sc)

 hc.sql("CREATE EXTERNAL TABLE IF NOT EXISTS default.TEST  (user_name
 string ,COMMENTS STRING )ROW FORMAT DELIMITED FIELDS TERMINATED BY '001'
 STORED AS TEXTFILE LOCATION '/data/kali/test' ")

 val op=hc.sql("select user_name,COLLECT_SET(text) from (select
 user_name,concat(sub,' ',count(comments)) as text  from default.test
 LATERAL
 VIEW explode(split(comments,',')) subView AS sub group by
 user_name,sub)w
 group by user_name")

 op.collect.foreach(println)


   }




 Thanks




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/word-count-group-by-users-in-spark-tp24748.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this 

Re: Spark on Yarn vs Standalone

2015-09-21 Thread Sandy Ryza
The warning your seeing in Spark is no issue.  The scratch space lives
inside the heap, so it'll never result in YARN killing the container by
itself.  The issue is that Spark is using some off-heap space on top of
that.

You'll need to bump the spark.yarn.executor.memoryOverhead property to give
the executors some additional headroom above the heap space.

-Sandy

On Mon, Sep 21, 2015 at 5:43 PM, Saisai Shao  wrote:

> I think you need to increase the memory size of executor through command
> arguments "--executor-memory", or configuration "spark.executor.memory".
>
> Also yarn.scheduler.maximum-allocation-mb in Yarn side if necessary.
>
> Thanks
> Saisai
>
>
> On Mon, Sep 21, 2015 at 5:13 PM, Alexander Pivovarov  > wrote:
>
>> I noticed that some executors have issue with scratch space.
>> I see the following in yarn app container stderr around the time when
>> yarn killed the executor because it uses too much memory.
>>
>> -- App container stderr --
>> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
>> rdd_6_346 in memory! (computed 3.0 GB so far)
>> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB
>> (blocks) + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB.
>> Storage limit = 25.2 GB.
>> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_346
>> to disk instead.
>> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
>> rdd_6_49 in memory! (computed 3.1 GB so far)
>> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB
>> (blocks) + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB.
>> Storage limit = 25.2 GB.
>> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_49
>> to disk instead.
>>
>> -- Yarn Nodemanager log --
>> 2015-09-21 21:44:05,716 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
>> (Container Monitor): Container
>> [pid=5114,containerID=container_1442869100946_0001_01_0
>> 00056] is running beyond physical memory limits. Current usage: 52.2 GB
>> of 52 GB physical memory used; 53.0 GB of 260 GB virtual memory used.
>> Killing container.
>> Dump of the process-tree for container_1442869100946_0001_01_56 :
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>> |- 5117 5114 5114 5114 (java) 1322810 27563 56916316160 13682772
>> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError=kill %p
>> -Xms47924m -Xmx47924m -verbose:gc -XX:+PrintGCDetails
>> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
>> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
>> -XX:+CMSClassUnloadingEnabled
>> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
>> -Dspark.akka.failure-detector.threshold=3000.0
>> -Dspark.akka.heartbeat.interval=1s -Dspark.akka.threads=4
>> -Dspark.history.ui.port=18080 -Dspark.akka.heartbeat.pauses=6s
>> -Dspark.akka.timeout=1000s -Dspark.akka.frameSize=50
>> -Dspark.driver.port=52690
>> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
>> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
>> application_1442869100946_0001 --user-class-path
>> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
>> |- 5114 5112 5114 5114 (bash) 0 0 9658368 291 /bin/bash -c
>> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError='kill %p'
>> -Xms47924m -Xmx47924m '-verbose:gc' '-XX:+PrintGCDetails'
>> '-XX:+PrintGCDateStamps' '-XX:+UseConcMarkSweepGC'
>> '-XX:CMSInitiatingOccupancyFraction=70' '-XX:MaxHeapFreeRatio=70'
>> '-XX:+CMSClassUnloadingEnabled'
>> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
>> '-Dspark.akka.failure-detector.threshold=3000.0'
>> '-Dspark.akka.heartbeat.interval=1s' '-Dspark.akka.threads=4'
>> '-Dspark.history.ui.port=18080' '-Dspark.akka.heartbeat.pauses=6s'
>> '-Dspark.akka.timeout=1000s' '-Dspark.akka.frameSize=50'
>> '-Dspark.driver.port=52690'
>> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
>> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
>> application_1442869100946_0001 --user-class-path
>> 

Re: Spark on Yarn vs Standalone

2015-09-21 Thread Alexander Pivovarov
I repartitioned input RDD from 4,800 to 24,000 partitions
After that the stage (24000 tasks) was done in 22 min on 100 boxes
Shuffle read/write: 905 GB / 710 GB

Task Metrics (Dur/GC/Read/Write)
Min: 7s/1s/38MB/30MB
Med: 22s/9s/38MB/30MB
Max:1.8min/1.6min/38MB/30MB

On Mon, Sep 21, 2015 at 5:55 PM, Sandy Ryza  wrote:

> The warning your seeing in Spark is no issue.  The scratch space lives
> inside the heap, so it'll never result in YARN killing the container by
> itself.  The issue is that Spark is using some off-heap space on top of
> that.
>
> You'll need to bump the spark.yarn.executor.memoryOverhead property to
> give the executors some additional headroom above the heap space.
>
> -Sandy
>
> On Mon, Sep 21, 2015 at 5:43 PM, Saisai Shao 
> wrote:
>
>> I think you need to increase the memory size of executor through command
>> arguments "--executor-memory", or configuration "spark.executor.memory".
>>
>> Also yarn.scheduler.maximum-allocation-mb in Yarn side if necessary.
>>
>> Thanks
>> Saisai
>>
>>
>> On Mon, Sep 21, 2015 at 5:13 PM, Alexander Pivovarov <
>> apivova...@gmail.com> wrote:
>>
>>> I noticed that some executors have issue with scratch space.
>>> I see the following in yarn app container stderr around the time when
>>> yarn killed the executor because it uses too much memory.
>>>
>>> -- App container stderr --
>>> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
>>> rdd_6_346 in memory! (computed 3.0 GB so far)
>>> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB
>>> (blocks) + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB.
>>> Storage limit = 25.2 GB.
>>> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition
>>> rdd_6_346 to disk instead.
>>> 15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
>>> rdd_6_49 in memory! (computed 3.1 GB so far)
>>> 15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB
>>> (blocks) + 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB.
>>> Storage limit = 25.2 GB.
>>> 15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_49
>>> to disk instead.
>>>
>>> -- Yarn Nodemanager log --
>>> 2015-09-21 21:44:05,716 WARN
>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
>>> (Container Monitor): Container
>>> [pid=5114,containerID=container_1442869100946_0001_01_0
>>> 00056] is running beyond physical memory limits. Current usage: 52.2 GB
>>> of 52 GB physical memory used; 53.0 GB of 260 GB virtual memory used.
>>> Killing container.
>>> Dump of the process-tree for container_1442869100946_0001_01_56 :
>>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>>> |- 5117 5114 5114 5114 (java) 1322810 27563 56916316160 13682772
>>> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError=kill %p
>>> -Xms47924m -Xmx47924m -verbose:gc -XX:+PrintGCDetails
>>> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
>>> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
>>> -XX:+CMSClassUnloadingEnabled
>>> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
>>> -Dspark.akka.failure-detector.threshold=3000.0
>>> -Dspark.akka.heartbeat.interval=1s -Dspark.akka.threads=4
>>> -Dspark.history.ui.port=18080 -Dspark.akka.heartbeat.pauses=6s
>>> -Dspark.akka.timeout=1000s -Dspark.akka.frameSize=50
>>> -Dspark.driver.port=52690
>>> -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
>>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>>> akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
>>> --executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
>>> application_1442869100946_0001 --user-class-path
>>> file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
>>> |- 5114 5112 5114 5114 (bash) 0 0 9658368 291 /bin/bash -c
>>> /usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError='kill %p'
>>> -Xms47924m -Xmx47924m '-verbose:gc' '-XX:+PrintGCDetails'
>>> '-XX:+PrintGCDateStamps' '-XX:+UseConcMarkSweepGC'
>>> '-XX:CMSInitiatingOccupancyFraction=70' '-XX:MaxHeapFreeRatio=70'
>>> '-XX:+CMSClassUnloadingEnabled'
>>> -Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
>>> '-Dspark.akka.failure-detector.threshold=3000.0'
>>> '-Dspark.akka.heartbeat.interval=1s' '-Dspark.akka.threads=4'
>>> '-Dspark.history.ui.port=18080' '-Dspark.akka.heartbeat.pauses=6s'
>>> '-Dspark.akka.timeout=1000s' '-Dspark.akka.frameSize=50'
>>> '-Dspark.driver.port=52690'
>>> 

Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Balaji Vijayan
Howdy,

I'm a relative novice at Spark/Scala and I'm puzzled by some behavior that
I'm seeing in 2 of my local Spark/Scala environments (Scala for Jupyter and
Scala IDE) but not the 3rd (Spark Shell). The following code throws the
following stack trace error in the former 2 environments but executes
successfully in the 3rd. I'm not sure how to go about troubleshooting my
former 2 environments so any assistance is greatly appreciated.

Code:

//get file
val logFile = "s3n://file"
val logData  = sc.textFile(logFile)
// header
val header =  logData.first
// filter out header
val sample = logData.filter(!_.contains(header)).map {
 line => line.replaceAll("['\"]","").substring(0,line.length()-1)
}.takeSample(false,100,12L)

Stack Trace:

org.apache.spark.SparkException: Task not serializable

org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)

org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
org.apache.spark.rdd.RDD.filter(RDD.scala:310)
cmd6$$user$$anonfun$3.apply(Main.scala:134)
cmd6$$user$$anonfun$3.apply(Main.scala:133)
java.io.NotSerializableException: org.apache.spark.SparkConf
Serialization stack:
- object not serializable (class: org.apache.spark.SparkConf, value:
org.apache.spark.SparkConf@309ed441)
- field (class: cmd2$$user, name: conf, type: class 
org.apache.spark.SparkConf)
- object (class cmd2$$user, cmd2$$user@75a88665)
- field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
- object (class cmd6, cmd6@5e9e8f0b)
- field (class: cmd6$$user, name: $outer, type: class cmd6)
- object (class cmd6$$user, cmd6$$user@692f81c)
- field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
cmd6$$user)
- object (class cmd6$$user$$anonfun$3, )
- field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer,
type: class cmd6$$user$$anonfun$3)
- object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )

org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)

org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
org.apache.spark.rdd.RDD.filter(RDD.scala:310)
cmd6$$user$$anonfun$3.apply(Main.scala:134)
cmd6$$user$$anonfun$3.apply(Main.scala:133)

Thanks,
Balaji


Re: Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Alexis Gillain
As Igor said header must be available on each partition so the solution is
broadcasting it.

About the difference between repl and scala IDE, it may come from the
sparkContext setup as REPL define one by default.

2015-09-22 8:41 GMT+08:00 Igor Berman :

> Try to broadcasr header
> On Sep 22, 2015 08:07, "Balaji Vijayan" 
> wrote:
>
>> Howdy,
>>
>> I'm a relative novice at Spark/Scala and I'm puzzled by some behavior
>> that I'm seeing in 2 of my local Spark/Scala environments (Scala for
>> Jupyter and Scala IDE) but not the 3rd (Spark Shell). The following code
>> throws the following stack trace error in the former 2 environments but
>> executes successfully in the 3rd. I'm not sure how to go about
>> troubleshooting my former 2 environments so any assistance is greatly
>> appreciated.
>>
>> Code:
>>
>> //get file
>> val logFile = "s3n://file"
>> val logData  = sc.textFile(logFile)
>> // header
>> val header =  logData.first
>> // filter out header
>> val sample = logData.filter(!_.contains(header)).map {
>>  line => line.replaceAll("['\"]","").substring(0,line.length()-1)
>> }.takeSample(false,100,12L)
>>
>> Stack Trace:
>>
>> org.apache.spark.SparkException: Task not serializable
>>  
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>>  
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>>  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>>  org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>>  org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>>  org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>>  
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>  
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>  org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>>  org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>>  cmd6$$user$$anonfun$3.apply(Main.scala:134)
>>  cmd6$$user$$anonfun$3.apply(Main.scala:133)
>> java.io.NotSerializableException: org.apache.spark.SparkConf
>> Serialization stack:
>>  - object not serializable (class: org.apache.spark.SparkConf, value: 
>> org.apache.spark.SparkConf@309ed441)
>>  - field (class: cmd2$$user, name: conf, type: class 
>> org.apache.spark.SparkConf)
>>  - object (class cmd2$$user, cmd2$$user@75a88665)
>>  - field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
>>  - object (class cmd6, cmd6@5e9e8f0b)
>>  - field (class: cmd6$$user, name: $outer, type: class cmd6)
>>  - object (class cmd6$$user, cmd6$$user@692f81c)
>>  - field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
>> cmd6$$user)
>>  - object (class cmd6$$user$$anonfun$3, )
>>  - field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer, 
>> type: class cmd6$$user$$anonfun$3)
>>  - object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )
>>  
>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>>  
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>>  
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>>  
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>>  
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>>  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>>  org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>>  org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>>  org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>>  
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>  
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>  org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>>  org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>>  cmd6$$user$$anonfun$3.apply(Main.scala:134)
>>  cmd6$$user$$anonfun$3.apply(Main.scala:133)
>>
>> Thanks,
>> Balaji
>>
>


-- 
Alexis GILLAIN


Re: How does one use s3 for checkpointing?

2015-09-21 Thread Jerry Lam
Hi Amit,

Have you looked at Amazon EMR? Most people using EMR use s3 for persistency 
(both as input and output of spark jobs). 

Best Regards,

Jerry

Sent from my iPhone

> On 21 Sep, 2015, at 9:24 pm, Amit Ramesh  wrote:
> 
> 
> A lot of places in the documentation mention using s3 for checkpointing, 
> however I haven't found any examples or concrete evidence of anyone having 
> done this.
> Is this a safe/reliable option given the read-after-write consistency for 
> PUTS in s3?
> Is s3 access broken for hadoop 2.6 (SPARK-7442)? If so, is it viable in 2.4?
> Related to #2. I did try providing hadoop-aws-2.6.0.jar while submitting the 
> job and got the following stack trace. Is there a fix?
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext.
> : java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: 
> Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at 
> org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
> at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
> at 
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at 
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.SparkContext.(SparkContext.scala:475)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at 
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at 
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NoClassDefFoundError: 
> com/amazonaws/AmazonServiceException
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
> at java.lang.Class.getConstructor0(Class.java:2885)
> at java.lang.Class.newInstance(Class.java:350)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 27 more
> Caused by: java.lang.ClassNotFoundException: 
> com.amazonaws.AmazonServiceException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 32 more
> 
> Thanks!
> Amit
> 


Spark Streaming distributed job

2015-09-21 Thread nibiau
Hello, 
Please could you explain me what is exactly distributed when I launch a spark 
streaming job over YARN cluster ?
My code is something like :

JavaDStream customReceiverStream = 
ssc.receiverStream(streamConfig.getJmsReceiver());

JavaDStream incoming_msg = customReceiverStream.map(
new Function()
{
public String call(JMSEvent jmsEvent)
{
return jmsEvent.getText();
}
}
);

incoming_msg.foreachRDD( new Function() {
public Void call(JavaRDD rdd) throws Exception {
rdd.foreachPartition(new VoidFunction() { 

@Override
public void call(Iterator msg) throws Exception 
{
while (msg.hasNext()) {
   // insert message in MongoDB
}


So, in this code , at what step is done the distribution over YARN :
- Does my receiver is distributed (and so all the rest also) ?
- Does the foreachRDD is distributed (and so all the rest also)?
- Does foreachPartition is distributed ?

Tks
Nicolas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Ted Yu
Which release are you using ?

>From the line number in ClosureCleaner, it seems you're using 1.4.x

Cheers

On Mon, Sep 21, 2015 at 4:07 PM, Balaji Vijayan 
wrote:

> Howdy,
>
> I'm a relative novice at Spark/Scala and I'm puzzled by some behavior that
> I'm seeing in 2 of my local Spark/Scala environments (Scala for Jupyter and
> Scala IDE) but not the 3rd (Spark Shell). The following code throws the
> following stack trace error in the former 2 environments but executes
> successfully in the 3rd. I'm not sure how to go about troubleshooting my
> former 2 environments so any assistance is greatly appreciated.
>
> Code:
>
> //get file
> val logFile = "s3n://file"
> val logData  = sc.textFile(logFile)
> // header
> val header =  logData.first
> // filter out header
> val sample = logData.filter(!_.contains(header)).map {
>  line => line.replaceAll("['\"]","").substring(0,line.length()-1)
> }.takeSample(false,100,12L)
>
> Stack Trace:
>
> org.apache.spark.SparkException: Task not serializable
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
> java.io.NotSerializableException: org.apache.spark.SparkConf
> Serialization stack:
>   - object not serializable (class: org.apache.spark.SparkConf, value: 
> org.apache.spark.SparkConf@309ed441)
>   - field (class: cmd2$$user, name: conf, type: class 
> org.apache.spark.SparkConf)
>   - object (class cmd2$$user, cmd2$$user@75a88665)
>   - field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
>   - object (class cmd6, cmd6@5e9e8f0b)
>   - field (class: cmd6$$user, name: $outer, type: class cmd6)
>   - object (class cmd6$$user, cmd6$$user@692f81c)
>   - field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
> cmd6$$user)
>   - object (class cmd6$$user$$anonfun$3, )
>   - field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer, 
> type: class cmd6$$user$$anonfun$3)
>   - object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )
>   
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>   
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>   
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
>
> Thanks,
> Balaji
>


Re: how to get RDD from two different RDDs with cross column

2015-09-21 Thread Zhiliang Zhu
Hi Romi,
Yes, you understand it correctly.And rdd1 keys are cross with rdd2 keys, that 
is, there are lots of same keys between rdd1 and rdd2, and there are some keys 
inrdd1 but not in rdd2, there are also some keys in rdd2 but not in rdd1.Then 
rdd3 keys would be same with rdd1 keys, rdd3 will not include the keys in rdd2 
but not in rdd1, values of rdd3 will comefrom rdd2, if the keys in rdd3 is not 
in rdd2 its value would  NOT exist.

You are always much perfect in spark and  having the solution about the 
questions, really appreciate you very much.
Thank you very much~
Zhiliang  


 On Tuesday, September 22, 2015 4:08 AM, Romi Kuntsman  
wrote:
   

 Hi,
If I understand correctly:
rdd1 contains keys (of type StringDate)
rdd2 contains keys and values
and rdd3 contains all the keys, and the values from rdd2?

I think you should make rdd1 and rdd2 PairRDD, and then use outer join.
Does that make sense?

On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhu  wrote:

Dear Romi, Priya, Sujt and Shivaram and all,
I have took lots of days to think into this issue, however, without  any enough 
good solution...I shall appreciate your all kind help.
There is an RDD rdd1, and another RDD rdd2, 
(rdd2 can be PairRDD, or DataFrame with two columns as ).StringDate column values from rdd1 and rdd2 are cross but not the same.

I would like to get a new RDD rdd3, StringDate in rdd3 would 
be all from (same) as rdd1, and float in rdd3 would be from rdd2 if its 
StringDate is in rdd2, or else NULL would be assigned.
each row in rdd3[ i ] = , 
rdd2[i].StringDate would be same as rdd1[ i ].StringDate, 
then rdd2[ i ].float is assigned rdd3[ i ] StringDate part. What kinds of API 
or function would I use...
Thanks very much!Zhiliang




  

spark.mesos.coarse impacts memory performance on mesos

2015-09-21 Thread Utkarsh Sengar
I am running Spark 1.4.1 on mesos.

The spark job does a "cartesian" of 4 RDDs (aRdd, bRdd, cRdd, dRdd) of size
100, 100, 7 and 1 respectively. Lets call it prouctRDD.

Creation of "aRdd" needs data pull from multiple data sources, merging it
and creating a tuple of JavaRdd, finally aRDD looks something like this:
JavaRDD>
bRdd, cRdd and dRdds are just List<> of values.

Then apply a transformation on prouctRDD and finally call "saveAsTextFile"
to save the result of my transformation.

Problem:
By setting "spark.mesos.coarse=true", creation of "aRdd" works fine but
driver crashes while doing the cartesian but when I do
"spark.mesos.coarse=true", the job works like a charm. I am running spark
on mesos.

Comments:
So I wanted to understand what role does "spark.mesos.coarse=true" plays in
terms of memory and compute performance. My findings look counter intuitive
since:

   1. "spark.mesos.coarse=true" just runs on 1 mesos task, so there should
   be an overhead of spinning up mesos tasks which should impact the
   performance.
   2. What config for "spark.mesos.coarse" recommended for running spark on
   mesos? Or there is no best answer and it depends on usecase?
   3. Also by setting "spark.mesos.coarse=true", I notice that I get huge
   GC pauses even with small dataset but a long running job (but this can be a
   separate discussion).

Let me know if I am missing something obvious, we are learning spark tuning
as we move forward :)

-- 
Thanks,
-Utkarsh


How does one use s3 for checkpointing?

2015-09-21 Thread Amit Ramesh
A lot of places in the documentation mention using s3 for checkpointing,
however I haven't found any examples or concrete evidence of anyone having
done this.

   1. Is this a safe/reliable option given the read-after-write consistency
   for PUTS in s3?
   2. Is s3 access broken for hadoop 2.6 (SPARK-7442
   )? If so, is it viable
   in 2.4?
   3. Related to #2. I did try providing hadoop-aws-2.6.0.jar while
   submitting the job and got the following stack trace. Is there a fix?

py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem:
Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:224)
at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
at
org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
at
org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
at
org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.SparkContext.(SparkContext.scala:475)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoClassDefFoundError:
com/amazonaws/AmazonServiceException
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
at java.lang.Class.getConstructor0(Class.java:2885)
at java.lang.Class.newInstance(Class.java:350)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
... 27 more
Caused by: java.lang.ClassNotFoundException:
com.amazonaws.AmazonServiceException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 32 more

Thanks!
Amit


Re: Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Igor Berman
Try to broadcasr header
On Sep 22, 2015 08:07, "Balaji Vijayan"  wrote:

> Howdy,
>
> I'm a relative novice at Spark/Scala and I'm puzzled by some behavior that
> I'm seeing in 2 of my local Spark/Scala environments (Scala for Jupyter and
> Scala IDE) but not the 3rd (Spark Shell). The following code throws the
> following stack trace error in the former 2 environments but executes
> successfully in the 3rd. I'm not sure how to go about troubleshooting my
> former 2 environments so any assistance is greatly appreciated.
>
> Code:
>
> //get file
> val logFile = "s3n://file"
> val logData  = sc.textFile(logFile)
> // header
> val header =  logData.first
> // filter out header
> val sample = logData.filter(!_.contains(header)).map {
>  line => line.replaceAll("['\"]","").substring(0,line.length()-1)
> }.takeSample(false,100,12L)
>
> Stack Trace:
>
> org.apache.spark.SparkException: Task not serializable
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
> java.io.NotSerializableException: org.apache.spark.SparkConf
> Serialization stack:
>   - object not serializable (class: org.apache.spark.SparkConf, value: 
> org.apache.spark.SparkConf@309ed441)
>   - field (class: cmd2$$user, name: conf, type: class 
> org.apache.spark.SparkConf)
>   - object (class cmd2$$user, cmd2$$user@75a88665)
>   - field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
>   - object (class cmd6, cmd6@5e9e8f0b)
>   - field (class: cmd6$$user, name: $outer, type: class cmd6)
>   - object (class cmd6$$user, cmd6$$user@692f81c)
>   - field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
> cmd6$$user)
>   - object (class cmd6$$user$$anonfun$3, )
>   - field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer, 
> type: class cmd6$$user$$anonfun$3)
>   - object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )
>   
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>   
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>   
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
>
> Thanks,
> Balaji
>


Iterator-based streaming, how is it efficient ?

2015-09-21 Thread Samuel Hailu
Hi,

In Spark's in-memory logic, without cache, elements are accessed in an
iterator-based streaming style [
http://www.slideshare.net/liancheng/dtcc-14-spark-runtime-internals?next_slideshow=1
]

I have two questions:


   1. if elements are read one line at at time from HDFS (disk) and then
   transformed based on the rdd operations, how is this efficient?
   2. which class in the Spark source does this? I'm expecting some kind of:

   for (partition_index <- iterator_over_a_partition)
   read_hdfs_line(partition_index).apply_tranformation()


Thanks,


Re: Spark on Yarn vs Standalone

2015-09-21 Thread Alexander Pivovarov
I noticed that some executors have issue with scratch space.
I see the following in yarn app container stderr around the time when yarn
killed the executor because it uses too much memory.

-- App container stderr --
15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
rdd_6_346 in memory! (computed 3.0 GB so far)
15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB (blocks)
+ 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB. Storage
limit = 25.2 GB.
15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_346
to disk instead.
15/09/21 21:43:22 WARN storage.MemoryStore: Not enough space to cache
rdd_6_49 in memory! (computed 3.1 GB so far)
15/09/21 21:43:22 INFO storage.MemoryStore: Memory use = 477.6 KB (blocks)
+ 24.4 GB (scratch space shared across 8 thread(s)) = 24.4 GB. Storage
limit = 25.2 GB.
15/09/21 21:43:22 WARN spark.CacheManager: Persisting partition rdd_6_49 to
disk instead.

-- Yarn Nodemanager log --
2015-09-21 21:44:05,716 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
(Container Monitor): Container
[pid=5114,containerID=container_1442869100946_0001_01_0
00056] is running beyond physical memory limits. Current usage: 52.2 GB of
52 GB physical memory used; 53.0 GB of 260 GB virtual memory used. Killing
container.
Dump of the process-tree for container_1442869100946_0001_01_56 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 5117 5114 5114 5114 (java) 1322810 27563 56916316160 13682772
/usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError=kill %p
-Xms47924m -Xmx47924m -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
-XX:+CMSClassUnloadingEnabled
-Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
-Dspark.akka.failure-detector.threshold=3000.0
-Dspark.akka.heartbeat.interval=1s -Dspark.akka.threads=4
-Dspark.history.ui.port=18080 -Dspark.akka.heartbeat.pauses=6s
-Dspark.akka.timeout=1000s -Dspark.akka.frameSize=50
-Dspark.driver.port=52690
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
--executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
application_1442869100946_0001 --user-class-path
file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
|- 5114 5112 5114 5114 (bash) 0 0 9658368 291 /bin/bash -c
/usr/lib/jvm/java-openjdk/bin/java -server -XX:OnOutOfMemoryError='kill %p'
-Xms47924m -Xmx47924m '-verbose:gc' '-XX:+PrintGCDetails'
'-XX:+PrintGCDateStamps' '-XX:+UseConcMarkSweepGC'
'-XX:CMSInitiatingOccupancyFraction=70' '-XX:MaxHeapFreeRatio=70'
'-XX:+CMSClassUnloadingEnabled'
-Djava.io.tmpdir=/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/tmp
'-Dspark.akka.failure-detector.threshold=3000.0'
'-Dspark.akka.heartbeat.interval=1s' '-Dspark.akka.threads=4'
'-Dspark.history.ui.port=18080' '-Dspark.akka.heartbeat.pauses=6s'
'-Dspark.akka.timeout=1000s' '-Dspark.akka.frameSize=50'
'-Dspark.driver.port=52690'
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
akka.tcp://sparkDriver@10.0.24.153:52690/user/CoarseGrainedScheduler
--executor-id 55 --hostname ip-10-0-28-96.ec2.internal --cores 8 --app-id
application_1442869100946_0001 --user-class-path
file:/mnt/yarn/usercache/hadoop/appcache/application_1442869100946_0001/container_1442869100946_0001_01_56/__app__.jar
1>
/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stdout
2>
/var/log/hadoop-yarn/containers/application_1442869100946_0001/container_1442869100946_0001_01_56/stderr



Is it possible to get what scratch space is used for?

What spark setting should I try to adjust to solve the issue?

On Thu, Sep 10, 2015 at 2:52 PM, Sandy Ryza  wrote:

> YARN will never kill processes for being unresponsive.
>
> It may kill processes for occupying more memory than it allows.  To get
> around this, you can either bump spark.yarn.executor.memoryOverhead or turn
> off the memory checks entirely with yarn.nodemanager.pmem-check-enabled.
>
> -Sandy
>
> On Tue, Sep 8, 2015 at 10:48 PM, Alexander Pivovarov  > wrote:
>
>> The problem which we have now is skew data (2360 tasks done in 5 min, 3
>> tasks in 40 min and 1 task in 2 hours)
>>

Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Sean Owen
I think foldByKey is much more what you want, as it has more a notion
of building up some result per key by encountering values serially.
You would take the first and ignore the rest. Note that "first"
depends on your RDD having an ordering to begin with, or else you rely
on however it happens to be ordered after whatever operations give you
a key-value RDD.

On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver  wrote:
> I am processing a single file and want to remove duplicate rows by some key
> by always choosing the first row in the file for that key.
>
> The best solution I could come up with is to zip each row with the partition
> index and local index, like this:
>
> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> ((partitionIndex, localIndex), row)) }
> }
>
>
> And then using reduceByKey with a min ordering on the (partitionIndex,
> localIndex) pair.
>
> First, can i count on SparkContext.textFile to read the lines in such that
> the partition indexes are always increasing so that the above works?
>
> And, is there a better way to accomplish the same effect?
>
> Thanks!
>
> - Philip
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why are executors on slave never used?

2015-09-21 Thread Hemant Bhanawat
When you specify master as local[2], it starts the spark components in a
single jvm. You need to specify the master correctly.
I have a default AWS EMR cluster (1 master, 1 slave) with Spark. When I run
a Spark process, it works fine -- but only on the master, as if it were
standalone.

The web-UI and logging code shows only 1 executor, the localhost.

How can I diagnose this?

(I create *SparkConf, *in Python, with *setMaster('local[2]'). )*

(Strangely, though I don't think that this causes the problem, there is
almost nothing spark-related on the slave machine:* /usr/lib/spark *has a
few jars, but that's it:  *datanucleus-api-jdo.jar  datanucleus-core.jar
 datanucleus-rdbms.jar  spark-yarn-shuffle.jar. *But this is an AWS EMR
cluster as created by* create-cluster*, so I would assume that the slave
and master are configured OK out-of the box.)

Joshua


Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
Hi Spark Developers,

I just ran some very simple operations on a dataset. I was surprise by the
execution plan of take(1), head() or first().

For your reference, this is what I did in pyspark 1.5:
df=sqlContext.read.parquet("someparquetfiles")
df.head()

The above lines take over 15 minutes. I was frustrated because I can do
better without using spark :) Since I like spark, so I tried to figure out
why. It seems the dataframe requires 3 stages to give me the first row. It
reads all data (which is about 1 billion rows) and run Limit twice.

Instead of head(), show(1) runs much faster. Not to mention that if I do:

df.rdd.take(1) //runs much faster.

Is this expected? Why head/first/take is so slow for dataframe? Is it a bug
in the optimizer? or I did something wrong?

Best Regards,

Jerry


Python Packages in Spark w/Mesos

2015-09-21 Thread John Omernik
Hey all -

Curious at the best way to include python packages in my Spark
installation. (Such as NLTK). Basically I am running on Mesos, and would
like to find a way to include the package in the binary distribution in
that I don't want to install packages on all nodes.  We should be able to
include in the distribution, right?.

I thought of using the Docker Mesos integration, but I have been unable to
find information on this (see my other question on Docker/Mesos/Spark).
Any other thoughts on the best way to include packages in Spark WITHOUT
installing on each node would be appreciated!

John


AWS_CREDENTIAL_FILE

2015-09-21 Thread Michel Lemay
Hi,

It looks like spark does read AWS credentials from environment variable
AWS_CREDENTIAL_FILE like awscli does.


Mike


Re: Count for select not matching count for group by

2015-09-21 Thread Richard Hillegas
For what it's worth, I get the expected result that "filter" behaves like
"group by" when I run the same experiment against a DataFrame which was
loaded from a relational store:

import org.apache.spark.sql._
import org.apache.spark.sql.types._

val df = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:derby:/Users/rhillegas/derby/databases/derby1",
  "dbtable" -> "app.outcomes")).load()

df.select("OUTCOME").groupBy("OUTCOME").count.show
#
# returns:
#
# +---+-+
# |OUTCOME|count|
# +---+-+
# |  A|  128|
# |  B|  256|
# +---+-+

df.filter("OUTCOME = 'A'").count
#
# returns:
#
# res1: Long = 128


df.registerTempTable("test_data")
sqlContext.sql("select OUTCOME, count( OUTCOME ) from test_data group by
OUTCOME").show
#
# returns:
#
# +---+---+
# |OUTCOME|_c1|
# +---+---+
# |  A|128|
# |  B|256|
# +---+---+

Thanks,
-Rick

Michael Kelly  wrote on 09/21/2015 08:06:29
AM:

> From: Michael Kelly 
> To: user@spark.apache.org
> Date: 09/21/2015 08:08 AM
> Subject: Count for select not matching count for group by
>
> Hi,
>
> I'm seeing some strange behaviour with spark 1.5, I have a dataframe
> that I have built from loading and joining some hive tables stored in
> s3.
>
> The dataframe is cached in memory, using df.cache.
>
> What I'm seeing is that the counts I get when I do a group by on a
> column are different from what I get when I filter/select and count.
>
> df.select("outcome").groupBy("outcome").count.show
> outcome | count
> --
> 'A'   |  100
> 'B'   |  200
>
> df.filter("outcome = 'A'").count
> # 50
>
> df.filter(df("outcome") === "A").count
> # 50
>
> I expect the count of columns that match 'A' in the groupBy to match
> the count when filtering. Any ideas what might be happening?
>
> Thanks,
>
> Michael
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

Why are executors on slave never used?

2015-09-21 Thread Joshua Fox
I have a default AWS EMR cluster (1 master, 1 slave) with Spark. When I run
a Spark process, it works fine -- but only on the master, as if it were
standalone.

The web-UI and logging code shows only 1 executor, the localhost.

How can I diagnose this?

(I create *SparkConf, *in Python, with *setMaster('local[2]'). )*

(Strangely, though I don't think that this causes the problem, there is
almost nothing spark-related on the slave machine:* /usr/lib/spark *has a
few jars, but that's it:  *datanucleus-api-jdo.jar  datanucleus-core.jar
 datanucleus-rdbms.jar  spark-yarn-shuffle.jar. *But this is an AWS EMR
cluster as created by* create-cluster*, so I would assume that the slave
and master are configured OK out-of the box.)

Joshua


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang


 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  
wrote:
   

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  
wrote:

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John




  

Re: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-21 Thread Ellen Kraffmiller
Thank you for the link! I was using
http://apache-spark-user-list.1001560.n3.nabble.com/, and I didn't see
replies there.

Regarding your code example, I'm doing the same thing and successfully
creating the rdd, but the problem is that when I call a clustering
algorithm like amap::hcluster(), I get an error from as.vector() that the
rdd cannot be coerced into a vector.

On Fri, Sep 18, 2015 at 12:33 PM, Luciano Resende 
wrote:

> I see the thread with all the responses on the bottom at mail-archive :
>
> https://www.mail-archive.com/user%40spark.apache.org/msg36882.html
>
> On Fri, Sep 18, 2015 at 7:58 AM, Ellen Kraffmiller <
> ellen.kraffmil...@gmail.com> wrote:
>
>> Thanks for your response.  Is there a reason why this thread isn't
>> appearing on the mailing list?  So far, I only see my post, with no
>> answers, although I have received 2 answers via email.  It would be nice if
>> other people could see these answers as well.
>>
>> On Thu, Sep 17, 2015 at 2:22 AM, Sun, Rui  wrote:
>>
>>> The existing algorithms operating on R data.frame can't simply operate
>>> on SparkR DataFrame. They have to be re-implemented to be based on SparkR
>>> DataFrame API.
>>>
>>> -Original Message-
>>> From: ekraffmiller [mailto:ellen.kraffmil...@gmail.com]
>>> Sent: Thursday, September 17, 2015 3:30 AM
>>> To: user@spark.apache.org
>>> Subject: SparkR - calling as.vector() with rdd dataframe causes error
>>>
>>> Hi,
>>> I have a library of clustering algorithms that I'm trying to run in the
>>> SparkR interactive shell. (I am working on a proof of concept for a
>>> document classification tool.) Each algorithm takes a term document matrix
>>> in the form of a dataframe.  When I pass the method a local dataframe, the
>>> clustering algorithm works correctly, but when I pass it a spark rdd, it
>>> gives an error trying to coerce the data into a vector.  Here is the code,
>>> that I'm calling within SparkR:
>>>
>>> # get matrix from a file
>>> file <-
>>>
>>> "/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"
>>>
>>> #read it into variable
>>>  raw_data <- read.csv(file,sep=',',header=FALSE)
>>>
>>> #convert to a local dataframe
>>> localDF = data.frame(raw_data)
>>>
>>> # create the rdd
>>> rdd  <- createDataFrame(sqlContext,localDF)
>>>
>>> #call the algorithm with the localDF - this works result <-
>>> galileo(localDF, model='hclust',dist='euclidean',link='ward',K=5)
>>>
>>> #call with the rdd - this produces error result <- galileo(rdd,
>>> model='hclust',dist='euclidean',link='ward',K=5)
>>>
>>> Error in as.vector(data) :
>>>   no method for coercing this S4 class to a vector
>>>
>>>
>>> I get the same error if I try to directly call as.vector(rdd) as well.
>>>
>>> Is there a reason why this works for localDF and not rdd?  Should I be
>>> doing something else to coerce the object into a vector?
>>>
>>> Thanks,
>>> Ellen
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
>>> additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Luciano Resende
> http://people.apache.org/~lresende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Sujit Pal
Hi Zhiliang,

Would something like this work?

val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))

-sujit


On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu 
wrote:

> Hi Romi,
>
> Thanks very much for your kind help comment~~
>
> In fact there is some valid backgroud of the application, it is about R
> data analysis.
> ...
> #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each
> daily fund return, row is the daily date
> #fund_return_daily needs to count the each fund's daily return subtracted
> the previous day's return
> fund_return_daily <- diff(log(fund_nav_daily))
>
> #the first row would be all 0, since there is no previous row ahead first
> row
> fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)),
> fund_return_daily)
> ...
>
> I need to exactly code the R program by way of spark, then RDD/DataFrame
> is used to replace R data.frame,
> however, I just found that it is VERY MUCH diffcult to make the spark
> program to flexibly descript & transform R backgroud applications.
> I think I have seriously lost myself into risk about this...
>
> Would you help direct me some about the above coding issue... and my risk
> about practice in spark/R application...
>
> I must show all my sincere thanks torwards your kind help.
>
> P.S. currently sparkR in spark 1.4.1 , there is many bug in the API
> createDataFrame/except/unionAll, and it seems
> that spark Java has more functions than sparkR.
> Also, no specific R regression algorithmn is including in sparkR .
>
> Best Regards,
> Zhiliang
>
>
> On Monday, September 21, 2015 7:36 PM, Romi Kuntsman 
> wrote:
>
>
> RDD is a set of data rows (in your case numbers), there is no meaning for
> the order of the items.
> What exactly are you trying to accomplish?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  > wrote:
>
> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>
>
>
>


Exception initializing JavaSparkContext

2015-09-21 Thread ekraffmiller
Hi,
I’m trying to run a simple test program to access Spark though Java.  I’m
using JDK 1.8, and Spark 1.5.  I’m getting an Exception from the
JavaSparkContext constructor.  My initialization code matches all the sample
code I’ve found online, so not sure what I’m doing wrong.

Here is my code:

SparkConf conf = new SparkConf().setAppName("Simple Application");
conf.setMaster("local");
conf.setAppName("my app");
JavaSparkContext sc = new JavaSparkContext(conf);

The stack trace of the Exception:

java.lang.ExceptionInInitializerError: null
at java.lang.Class.getField(Class.java:1690)
at
org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:220)
at
org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
at
org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
at
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:189)
at
org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala:58)
at
org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala)
at
org.apache.spark.storage.DiskBlockManager.addShutdownHook(DiskBlockManager.scala:147)
at
org.apache.spark.storage.DiskBlockManager.(DiskBlockManager.scala:54)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:75)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
at org.apache.spark.SparkContext.(SparkContext.scala:441)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at
edu.harvard.iq.text.core.spark.SparkControllerTest.testMongoRDD(SparkControllerTest.java:63)

Thanks,
Ellen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Exception-initializing-JavaSparkContext-tp24755.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python Packages in Spark w/Mesos

2015-09-21 Thread Tim Chen
Hi John,

Sorry haven't get time to respond to your questions over the weekend.

If you're running client mode, to use the Docker/Mesos integration
minimally you just need to set the image configuration
'spark.mesos.executor.docker.image' as stated in the documentation, which
Spark will use this image to run each Spark executor.

Therefore, if you want to include your python dependencies, you can also
pre-install them in that image and it should be able to find it if you set
the PYTHON env variables pointing to those. I'm not very familiar with
python, but I recently got Mesos cluster mode with python to work and it's
merged into master.

Tim

On Mon, Sep 21, 2015 at 8:34 AM, John Omernik  wrote:

> Hey all -
>
> Curious at the best way to include python packages in my Spark
> installation. (Such as NLTK). Basically I am running on Mesos, and would
> like to find a way to include the package in the binary distribution in
> that I don't want to install packages on all nodes.  We should be able to
> include in the distribution, right?.
>
> I thought of using the Docker Mesos integration, but I have been unable to
> find information on this (see my other question on Docker/Mesos/Spark).
> Any other thoughts on the best way to include packages in Spark WITHOUT
> installing on each node would be appreciated!
>
> John
>


how to get RDD from two different RDDs with cross column

2015-09-21 Thread Zhiliang Zhu
Dear Romi, Priya, Sujt and Shivaram and all,
I have took lots of days to think into this issue, however, without  any enough 
good solution...I shall appreciate your all kind help.
There is an RDD rdd1, and another RDD rdd2, 
(rdd2 can be PairRDD, or DataFrame with two columns as ).StringDate column values from rdd1 and rdd2 are cross but not the same.

I would like to get a new RDD rdd3, StringDate in rdd3 would 
be all from (same) as rdd1, and float in rdd3 would be from rdd2 if its 
StringDate is in rdd2, or else NULL would be assigned.
each row in rdd3[ i ] = , 
rdd2[i].StringDate would be same as rdd1[ i ].StringDate, 
then rdd2[ i ].float is assigned rdd3[ i ] StringDate part. What kinds of API 
or function would I use...
Thanks very much!Zhiliang



Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Yin Huai
Seems 1.4 has the same issue.

On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:

> btw, does 1.4 has the same problem?
>
> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:
>
>> Hi Jerry,
>>
>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:
>>
>>> Hi Spark Developers,
>>>
>>> I just ran some very simple operations on a dataset. I was surprise by
>>> the execution plan of take(1), head() or first().
>>>
>>> For your reference, this is what I did in pyspark 1.5:
>>> df=sqlContext.read.parquet("someparquetfiles")
>>> df.head()
>>>
>>> The above lines take over 15 minutes. I was frustrated because I can do
>>> better without using spark :) Since I like spark, so I tried to figure out
>>> why. It seems the dataframe requires 3 stages to give me the first row. It
>>> reads all data (which is about 1 billion rows) and run Limit twice.
>>>
>>> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>>>
>>> df.rdd.take(1) //runs much faster.
>>>
>>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>>> bug in the optimizer? or I did something wrong?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
Hi Yin,

You are right! I just tried the scala version with the above lines, it
works as expected.
I'm not sure if it happens also in 1.4 for pyspark but I thought the
pyspark code just calls the scala code via py4j. I didn't expect that this
bug is pyspark specific. That surprises me actually a bit. I created a
ticket for this (SPARK-10731
).

Best Regards,

Jerry


On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai  wrote:

> btw, does 1.4 has the same problem?
>
> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:
>
>> Hi Jerry,
>>
>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:
>>
>>> Hi Spark Developers,
>>>
>>> I just ran some very simple operations on a dataset. I was surprise by
>>> the execution plan of take(1), head() or first().
>>>
>>> For your reference, this is what I did in pyspark 1.5:
>>> df=sqlContext.read.parquet("someparquetfiles")
>>> df.head()
>>>
>>> The above lines take over 15 minutes. I was frustrated because I can do
>>> better without using spark :) Since I like spark, so I tried to figure out
>>> why. It seems the dataframe requires 3 stages to give me the first row. It
>>> reads all data (which is about 1 billion rows) and run Limit twice.
>>>
>>> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>>>
>>> df.rdd.take(1) //runs much faster.
>>>
>>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>>> bug in the optimizer? or I did something wrong?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
I just noticed you found 1.4 has the same issue. I added that as well in
the ticket.

On Mon, Sep 21, 2015 at 1:43 PM, Jerry Lam  wrote:

> Hi Yin,
>
> You are right! I just tried the scala version with the above lines, it
> works as expected.
> I'm not sure if it happens also in 1.4 for pyspark but I thought the
> pyspark code just calls the scala code via py4j. I didn't expect that this
> bug is pyspark specific. That surprises me actually a bit. I created a
> ticket for this (SPARK-10731
> ).
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai  wrote:
>
>> btw, does 1.4 has the same problem?
>>
>> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:
>>
>>> Hi Jerry,
>>>
>>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:
>>>
 Hi Spark Developers,

 I just ran some very simple operations on a dataset. I was surprise by
 the execution plan of take(1), head() or first().

 For your reference, this is what I did in pyspark 1.5:
 df=sqlContext.read.parquet("someparquetfiles")
 df.head()

 The above lines take over 15 minutes. I was frustrated because I can do
 better without using spark :) Since I like spark, so I tried to figure out
 why. It seems the dataframe requires 3 stages to give me the first row. It
 reads all data (which is about 1 billion rows) and run Limit twice.

 Instead of head(), show(1) runs much faster. Not to mention that if I
 do:

 df.rdd.take(1) //runs much faster.

 Is this expected? Why head/first/take is so slow for dataframe? Is it a
 bug in the optimizer? or I did something wrong?

 Best Regards,

 Jerry

>>>
>>>
>>
>


JDBCRdd issue

2015-09-21 Thread Saurabh Malviya (samalviy)
Hi,


While using reference with in JDBCRdd , It is throwing serializable exception. 
Does JDBCRdd does not except reference from other part of code.?
 confMap= ConfFactory.getConf(ParquetStreaming)

  val jdbcRDD = new JdbcRDD(sc, () => {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
DriverManager.getConnection(confMap(PHOENIX_URL)) -Throwing 
below exception

//DriverManager.getConnection(ConfFactory.getConf(ParquetStreaming)(PHOENIX_URL))
 ---This works
  }, s"SELECT tenant_id, data_source_id, mne_id, device_type1_key " 
+
 s" FROM XYZ_TYPE1_TEST WHERE DEVICE_TYPE1_KEY >= ? and 
DEVICE_TYPE1_KEY <= ? and TENANT_ID in ($tenantIds) " +
 s" AND DATA_SOURCE_ID in ($dataSourceIds) AND ISDELETED = 
false",
minKey, maxKey, 10, row => DeviceDel(row.getString(1), 
row.getString(2), row.getLong(3), row.getLong(4))).cache()

It throws runtime exception. However, " 
DriverManager.getConnection("jdbc:phoenix:10.20.87.1:2181") "   works fine.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task not serializable: java.io.NotSerializableException: 
org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: 
org.apache.spark.SparkContext, value: 
org.apache.spark.SparkContext@5bb273b4)
- field (class: 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1, name: sc$1, 
type: class org.apache.spark.SparkContext)
- object (class 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1, )
- field (class: 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1$$anonfun$6, 
name: $outer, type: class $$anonfun$main$1)
- object (class 
advance_reporting.transformations.DeviceDelETL$$anonfun$main$1$$anonfun$6, 
)
- field (class: org.apache.spark.rdd.JdbcRDD, name: 
org$apache$spark$rdd$JdbcRDD$$getConnection, type: interface scala.Function0)
- object (class org.apache.spark.rdd.JdbcRDD, JdbcRDD[15] at 
JdbcRDD at DeviceDelETL.scala:91)
- field (class: scala.Tuple2, name: _1, type: class 
java.lang.Object)
- object (class scala.Tuple2, (JdbcRDD[15] at JdbcRDD at 
DeviceDelETL.scala:91,))
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:878)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at 
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Sep 18, 2015 12:22:59 PM INFO: parquet.hadoop.ParquetFileReader: Initiating 
action with parallelism: 5

Any idea?



Fwd: Issue with high no of skipped task

2015-09-21 Thread Saurav Sinha
-- Forwarded message --
From: "Saurav Sinha" 
Date: 21-Sep-2015 11:48 am
Subject: Issue with high no of skipped task
To: 
Cc:


Hi Users,

I am new Spark I have written flow.When we deployed our code it is
completing jobs in 4-5 min. But now it is taking 20+ min in completing with
almost same set of data. Can you please help me to figure out reason for it.

-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Hi Romi,
I must show my sincere appreciation towards your kind & helpful help.
One more question, currently I am using spark to deal with financial data 
analysis, so lots of operations on R data.frame/matrix and stat/regressionare 
always called.However, SparkR currently is not that strong, most of its 
functions are from spark SQL and Mlib. Then, SQL and DataFrame is not as 
flexibly & easyas R operate on data.frame/matrix, moreover, now I do not decide 
how much function in Mlib would be used to R specific stat/regression .
I have also thought of only operating the data by way of spark Java, it is 
quite much hard to act as data.frame/matrix from R .I think I have lost in risk 
by those.
Would you help comment some on my points...
Thank you very much!Zhiliang


 


 On Tuesday, September 22, 2015 1:21 AM, Sujit Pal  
wrote:
   

 Hi Zhiliang,
Haven't used the Java API but found this Javadoc page, may be helpful to you.
https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html

I think the equivalent Java code snippet might go something like this:
RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2)
(the second parameter of fromRDD comes from this discussion 
thread).http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-td6768.html

There is also the SlidingRDD 
decorator:https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/SlidingRDD.html

So maybe something like this:
new SlidingRDD(rdd1, 2, ClassTag$.apply(Class))
-sujit

On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu  wrote:

Hi Sujit,
I must appreciate your kind help very much~
It seems to be OK, however, do you know the corresponding spark Java API 
achievement...Is there any java API as scala sliding, and it seemed that I do 
not find spark scala's doc about sliding ...
Thank you very much~Zhiliang 


 On Monday, September 21, 2015 11:48 PM, Sujit Pal  
wrote:
   

 Hi Zhiliang, 
Would something like this work?
val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
-sujit

On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  
wrote:

Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang


 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  
wrote:
   

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  
wrote:

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John




   



   



  

Re: Exception initializing JavaSparkContext

2015-09-21 Thread Marcelo Vanzin
What Spark package are you using? In particular, which hadoop version?

On Mon, Sep 21, 2015 at 9:14 AM, ekraffmiller
 wrote:
> Hi,
> I’m trying to run a simple test program to access Spark though Java.  I’m
> using JDK 1.8, and Spark 1.5.  I’m getting an Exception from the
> JavaSparkContext constructor.  My initialization code matches all the sample
> code I’ve found online, so not sure what I’m doing wrong.
>
> Here is my code:
>
> SparkConf conf = new SparkConf().setAppName("Simple Application");
> conf.setMaster("local");
> conf.setAppName("my app");
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> The stack trace of the Exception:
>
> java.lang.ExceptionInInitializerError: null
> at java.lang.Class.getField(Class.java:1690)
> at
> org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:220)
> at
> org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
> at
> org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
> at
> org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:189)
> at
> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala:58)
> at
> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala)
> at
> org.apache.spark.storage.DiskBlockManager.addShutdownHook(DiskBlockManager.scala:147)
> at
> org.apache.spark.storage.DiskBlockManager.(DiskBlockManager.scala:54)
> at org.apache.spark.storage.BlockManager.(BlockManager.scala:75)
> at 
> org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
> at 
> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
> at org.apache.spark.SparkContext.(SparkContext.scala:441)
> at
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at
> edu.harvard.iq.text.core.spark.SparkControllerTest.testMongoRDD(SparkControllerTest.java:63)
>
> Thanks,
> Ellen
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-initializing-JavaSparkContext-tp24755.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataGenerator for streaming application

2015-09-21 Thread Saiph Kappa
Thanks a lot. Now it's working fine. I wasn't aware of "socketTextStream",
not sure if it was documented in the spark programming guide.

On Mon, Sep 21, 2015 at 12:46 PM, Hemant Bhanawat 
wrote:

> Why are you using  rawSocketStream to read the data? I believe
> rawSocketStream waits for a big chunk of data before it can start
> processing it. I think what you are writing is a String and you should use
> socketTextStream which reads the data on a per line basis.
>
> On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa 
> wrote:
>
>> Hi,
>>
>> I am trying to build a data generator that feeds a streaming application.
>> This data generator just reads a file and send its lines through a socket.
>> I get no errors on the logs, and the benchmark bellow always prints
>> "Received 0 records". Am I doing something wrong?
>>
>>
>> object MyDataGenerator {
>>
>>   def main(args: Array[String]) {
>> if (args.length != 3) {
>>   System.err.println("Usage: RawTextSender   ")
>>   System.exit(1)
>> }
>> // Parse the arguments using a pattern match
>> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>>
>> val serverSocket = new ServerSocket(port)
>> println("Listening on port " + port)
>>
>>
>> while (true) {
>>   val socket = serverSocket.accept()
>>   println("Got a new connection")
>>
>>
>>   val out = new PrintWriter(socket.getOutputStream)
>>   try {
>> var count = 0
>> var startTimestamp = -1
>> for (line <- Source.fromFile(file).getLines()) {
>>   val ts = line.substring(2, line.indexOf(',',2)).toInt
>>   if(startTimestamp < 0)
>> startTimestamp = ts
>>
>>   if(ts - startTimestamp <= 30) {
>> out.println(line)
>> count += 1
>>   } else {
>> println(s"Emmited reports: $count")
>> count = 0
>> out.flush()
>> startTimestamp = ts
>> Thread.sleep(sleepMillis)
>>   }
>> }
>>   } catch {
>> case e: IOException =>
>>   println("Client disconnected")
>>   socket.close()
>>   }
>> }
>> }
>> }
>>
>>
>>
>> object Benchmark {
>>   def main(args: Array[String]) {
>> if (args.length != 4) {
>>   System.err.println("Usage: RawNetworkGrep
>> ")
>>   System.exit(1)
>> }
>>
>> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
>> args(2).toInt, args(3).toInt)
>> val sparkConf = new SparkConf()
>> sparkConf.setAppName("BenchMark")
>> 
>> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
>> sparkConf.set("spark.serializer", 
>> "org.apache.spark.serializer.KryoSerializer")
>> sparkConf.set("spark.executor.extraJavaOptions", " 
>> -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts 
>> -XX:FreqInlineSize=300 -XX:MaxInlineSize=300 ")
>> if (sparkConf.getOption("spark.master") == None) {
>>   // Master not set, as this was not launched through Spark-submit. 
>> Setting master as local."
>>   sparkConf.setMaster("local[*]")
>> }
>>
>> // Create the context
>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>>
>> val rawStreams = (1 to numStreams).map(_ =>
>>   ssc.rawSocketStream[String](host, port, 
>> StorageLevel.MEMORY_ONLY_SER)).toArray
>> val union = ssc.union(rawStreams)
>> union.count().map(c => s"Received $c records").print()
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>> }
>>
>> Thanks.
>>
>>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Yin Huai
Looks like the problem is df.rdd does not work very well with limit. In
scala, df.limit(1).rdd will also trigger the issue you observed. I will add
this in the jira.

On Mon, Sep 21, 2015 at 10:44 AM, Jerry Lam  wrote:

> I just noticed you found 1.4 has the same issue. I added that as well in
> the ticket.
>
> On Mon, Sep 21, 2015 at 1:43 PM, Jerry Lam  wrote:
>
>> Hi Yin,
>>
>> You are right! I just tried the scala version with the above lines, it
>> works as expected.
>> I'm not sure if it happens also in 1.4 for pyspark but I thought the
>> pyspark code just calls the scala code via py4j. I didn't expect that this
>> bug is pyspark specific. That surprises me actually a bit. I created a
>> ticket for this (SPARK-10731
>> ).
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai  wrote:
>>
>>> btw, does 1.4 has the same problem?
>>>
>>> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:
>>>
 Hi Jerry,

 Looks like it is a Python-specific issue. Can you create a JIRA?

 Thanks,

 Yin

 On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam 
 wrote:

> Hi Spark Developers,
>
> I just ran some very simple operations on a dataset. I was surprise by
> the execution plan of take(1), head() or first().
>
> For your reference, this is what I did in pyspark 1.5:
> df=sqlContext.read.parquet("someparquetfiles")
> df.head()
>
> The above lines take over 15 minutes. I was frustrated because I can
> do better without using spark :) Since I like spark, so I tried to figure
> out why. It seems the dataframe requires 3 stages to give me the first 
> row.
> It reads all data (which is about 1 billion rows) and run Limit twice.
>
> Instead of head(), show(1) runs much faster. Not to mention that if I
> do:
>
> df.rdd.take(1) //runs much faster.
>
> Is this expected? Why head/first/take is so slow for dataframe? Is it
> a bug in the optimizer? or I did something wrong?
>
> Best Regards,
>
> Jerry
>


>>>
>>
>


Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Ashish Soni
Hi All ,

Just wanted to find out if there is an benefits to installing  kafka
brokers and spark nodes on the same machine ?

is it possible that spark can pull data from kafka if it is local to the
node i.e. the broker or partition is on the same machine.

Thanks,
Ashish


Re: Docker/Mesos with Spark

2015-09-21 Thread Tim Chen
Hi John,

There is no other blog post yet, I'm thinking to do a series of posts but
so far haven't get time to do that yet.

Running Spark in docker containers makes distributing spark versions easy,
it's simple to upgrade and automatically caches on the slaves so the same
image just runs right away. Most of the docker perf is usually related to
network and filesystem overheads, but I think with recent changes in Spark
to make Mesos sandbox the default temp dir filesystem won't be a big
concern as it's mostly writing to the mounted in Mesos sandbox. Also Mesos
uses host network by default so network is affected much.

Most of the cluster mode limitation is that you need to make the spark job
files available somewhere that all the slaves can access remotely (http,
s3, hdfs, etc) or available on all slaves locally by path.

I'll try to make more doc efforts once I get my existing patches and
testing infra work done.

Let me know if you have more questions,

Tim

On Sat, Sep 19, 2015 at 5:42 AM, John Omernik  wrote:

> I was searching in the 1.5.0 docs on the Docker on Mesos capabilities and
> just found you CAN run it this way.  Are there any user posts, blog posts,
> etc on why and how you'd do this?
>
> Basically, at first I was questioning why you'd run spark in a docker
> container, i.e., if you run with tar balled executor, what are you really
> gaining?  And in this setup, are you losing out on performance somehow? (I
> am guessing smarter people than I have figured that out).
>
> Then I came along a situation where I wanted to use a python library with
> spark, and it had to be installed on every node, and I realized one big
> advantage of dockerized spark would be that spark apps that needed other
> libraries could be contained and built well.
>
> OK, that's huge, let's do that.  For my next question there are lot of
> "questions" have on how this actually works.  Does Clustermode/client mode
> apply here? If so, how?  Is there a good walk through on getting this
> setup? Limitations? Gotchas?  Should I just dive in an start working with
> it? Has anyone done any stories/rough documentation? This seems like a
> really helpful feature to scaling out spark, and letting developers truly
> build what they need without tons of admin overhead, so I really want to
> explore.
>
> Thanks!
>
> John
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Yin Huai
Hi Jerry,

Looks like it is a Python-specific issue. Can you create a JIRA?

Thanks,

Yin

On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:

> Hi Spark Developers,
>
> I just ran some very simple operations on a dataset. I was surprise by the
> execution plan of take(1), head() or first().
>
> For your reference, this is what I did in pyspark 1.5:
> df=sqlContext.read.parquet("someparquetfiles")
> df.head()
>
> The above lines take over 15 minutes. I was frustrated because I can do
> better without using spark :) Since I like spark, so I tried to figure out
> why. It seems the dataframe requires 3 stages to give me the first row. It
> reads all data (which is about 1 billion rows) and run Limit twice.
>
> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>
> df.rdd.take(1) //runs much faster.
>
> Is this expected? Why head/first/take is so slow for dataframe? Is it a
> bug in the optimizer? or I did something wrong?
>
> Best Regards,
>
> Jerry
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Yin Huai
btw, does 1.4 has the same problem?

On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai  wrote:

> Hi Jerry,
>
> Looks like it is a Python-specific issue. Can you create a JIRA?
>
> Thanks,
>
> Yin
>
> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam  wrote:
>
>> Hi Spark Developers,
>>
>> I just ran some very simple operations on a dataset. I was surprise by
>> the execution plan of take(1), head() or first().
>>
>> For your reference, this is what I did in pyspark 1.5:
>> df=sqlContext.read.parquet("someparquetfiles")
>> df.head()
>>
>> The above lines take over 15 minutes. I was frustrated because I can do
>> better without using spark :) Since I like spark, so I tried to figure out
>> why. It seems the dataframe requires 3 stages to give me the first row. It
>> reads all data (which is about 1 billion rows) and run Limit twice.
>>
>> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>>
>> df.rdd.take(1) //runs much faster.
>>
>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>> bug in the optimizer? or I did something wrong?
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Re: spark + parquet + schema name and metadata

2015-09-21 Thread Cheng Lian
Currently Spark SQL doesn't support customizing schema name and 
metadata. May I know why these two matters in your use case? Some 
Parquet data models, like parquet-avro, do support it, while some others 
don't (e.g. parquet-hive).


Cheng

On 9/21/15 7:13 AM, Borisa Zivkovic wrote:

Hi,

I am trying to figure out how to write parquet metadata when 
persisting DataFrames to parquet using Spark (1.4.1)


I could not find a way to change schema name (which seems to be 
hardcoded to root) and also how to add data to key/value metadata in 
parquet footer.


org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData

org.apache.parquet.schema.Type#getName

thanks





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Hi Sujit,
I must appreciate your kind help very much~
It seems to be OK, however, do you know the corresponding spark Java API 
achievement...Is there any java API as scala sliding, and it seemed that I do 
not find spark scala's doc about sliding ...
Thank you very much~Zhiliang 


 On Monday, September 21, 2015 11:48 PM, Sujit Pal  
wrote:
   

 Hi Zhiliang, 
Would something like this work?
val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
-sujit

On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  
wrote:

Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang


 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  
wrote:
   

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  
wrote:

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John




   



  

Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Zhiliang Zhu
Hi Sujit,
Thanks very much for your kind help.I have found the sliding doc in both scala 
and java spark, it is from mlib RDDFunctions, though in the doc there is always 
not enough example.
Best Regards,Zhiliang

 


 On Monday, September 21, 2015 11:48 PM, Sujit Pal  
wrote:
   

 Hi Zhiliang, 
Would something like this work?
val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
-sujit

On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  
wrote:

Hi Romi,
Thanks very much for your kind help comment~~
In fact there is some valid backgroud of the application, it is about R data 
analysis #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is 
each daily fund return, row is the daily date#fund_return_daily needs to count 
the each fund's daily return subtracted the previous day's return 
fund_return_daily <- diff(log(fund_nav_daily)) 
#the first row would be all 0, since there is no previous row ahead first row
fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)), 
fund_return_daily) ...
I need to exactly code the R program by way of spark, then RDD/DataFrame is 
used to replace R data.frame, however, I just found that it is VERY MUCH 
diffcult to make the spark program to flexibly descript & transform R backgroud 
applications.I think I have seriously lost myself into risk about this...
Would you help direct me some about the above coding issue... and my risk about 
practice in spark/R application...
I must show all my sincere thanks torwards your kind help.

P.S. currently sparkR in spark 1.4.1 , there is many bug in the API 
createDataFrame/except/unionAll, and it seemsthat spark Java has more functions 
than sparkR.Also, no specific R regression algorithmn is including in sparkR .
Best Regards,Zhiliang


 On Monday, September 21, 2015 7:36 PM, Romi Kuntsman  
wrote:
   

 RDD is a set of data rows (in your case numbers), there is no meaning for the 
order of the items.
What exactly are you trying to accomplish?

Romi Kuntsman, Big Data Engineer
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  
wrote:

Dear , 

I have took lots of days to think into this issue, however, without any 
success...I shall appreciate your all kind help.
There is an RDD rdd1, I would like get a new RDD rdd2, each row in 
rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .What kinds of API or function would I use...

Thanks very much!John




   



  

Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Sujit Pal
Hi Zhiliang,

Haven't used the Java API but found this Javadoc page, may be helpful to
you.

https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/RDDFunctions.html

I think the equivalent Java code snippet might go something like this:

RDDFunctions.fromRDD(rdd1, ClassTag$.apply(Class)).sliding(2)

(the second parameter of fromRDD comes from this discussion thread).
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-construct-a-ClassTag-object-as-a-method-parameter-in-Java-td6768.html

There is also the SlidingRDD decorator:
https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/rdd/SlidingRDD.html

So maybe something like this:

new SlidingRDD(rdd1, 2, ClassTag$.apply(Class))

-sujit

On Mon, Sep 21, 2015 at 9:16 AM, Zhiliang Zhu  wrote:

> Hi Sujit,
>
> I must appreciate your kind help very much~
>
> It seems to be OK, however, do you know the corresponding spark Java API
> achievement...
> Is there any java API as scala sliding, and it seemed that I do not find
> spark scala's doc about sliding ...
>
> Thank you very much~
> Zhiliang
>
>
>
> On Monday, September 21, 2015 11:48 PM, Sujit Pal 
> wrote:
>
>
> Hi Zhiliang,
>
> Would something like this work?
>
> val rdd2 = rdd1.sliding(2).map(v => v(1) - v(0))
>
> -sujit
>
>
> On Mon, Sep 21, 2015 at 7:58 AM, Zhiliang Zhu  > wrote:
>
> Hi Romi,
>
> Thanks very much for your kind help comment~~
>
> In fact there is some valid backgroud of the application, it is about R
> data analysis.
> ...
> #fund_nav_daily is a M X N (or M X 1) matrix or data.frame, col is each
> daily fund return, row is the daily date
> #fund_return_daily needs to count the each fund's daily return subtracted
> the previous day's return
> fund_return_daily <- diff(log(fund_nav_daily))
>
> #the first row would be all 0, since there is no previous row ahead first
> row
> fund_return_daily <- rbind(matrix(0,ncol = ncol(fund_return_daily)),
> fund_return_daily)
> ...
>
> I need to exactly code the R program by way of spark, then RDD/DataFrame
> is used to replace R data.frame,
> however, I just found that it is VERY MUCH diffcult to make the spark
> program to flexibly descript & transform R backgroud applications.
> I think I have seriously lost myself into risk about this...
>
> Would you help direct me some about the above coding issue... and my risk
> about practice in spark/R application...
>
> I must show all my sincere thanks torwards your kind help.
>
> P.S. currently sparkR in spark 1.4.1 , there is many bug in the API
> createDataFrame/except/unionAll, and it seems
> that spark Java has more functions than sparkR.
> Also, no specific R regression algorithmn is including in sparkR .
>
> Best Regards,
> Zhiliang
>
>
> On Monday, September 21, 2015 7:36 PM, Romi Kuntsman 
> wrote:
>
>
> RDD is a set of data rows (in your case numbers), there is no meaning for
> the order of the items.
> What exactly are you trying to accomplish?
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu  > wrote:
>
> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>
>
>
>
>
>
>


sqlContext.read.avro broadcasting files from the driver

2015-09-21 Thread Daniel Haviv
Hi,
I'm loading a 1000 files using the spark-avro package:
val df = sqlContext.read.avro(*"/incoming/"*)

When I'm performing an action on this df it seems like for each file a
broadcast is being created and is sent to the workers (instead of the
workers reading their data-local files):

scala> df.coalesce(4).count
15/09/21 15:11:32 INFO storage.MemoryStore: ensureFreeSpace(261920) called
with curMem=0, maxMem=2223023063
15/09/21 15:11:32 INFO storage.MemoryStore: Block broadcast_0 stored as
values in memory (estimated size 255.8 KB, free 2.1 GB)
15/09/21 15:11:32 INFO storage.MemoryStore: ensureFreeSpace(22987) called
with curMem=261920, maxMem=2223023063
15/09/21 15:11:32 INFO storage.MemoryStore: Block broadcast_0_piece0 stored
as bytes in memory (estimated size 22.4 KB, free 2.1 GB)
15/09/21 15:11:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on 192.168.3.4:39736 (size: 22.4 KB, free: 2.1 GB)



15/09/21 15:12:45 INFO storage.MemoryStore: ensureFreeSpace(22987) called
with curMem=294913622, maxMem=2223023063
15/09/21 15:12:45 INFO storage.MemoryStore: Block
*broadcast_1034_piece0 *stored
as bytes in memory (estimated size 22.4 KB, free 1838.8 MB)
15/09/21 15:12:45 INFO storage.BlockManagerInfo: Added
broadcast_1034_piece0 in memory on 192.168.3.4:39736 (size: 22.4 KB, free:
2.0 GB)
15/09/21 15:12:45 INFO spark.SparkContext: Created broadcast 1034 from
hadoopFile at AvroRelation.scala:121
15/09/21 15:12:46 INFO execution.Exchange: Using SparkSqlSerializer2.
15/09/21 15:12:46 INFO spark.SparkContext: Starting job: count at
:25

Am I understanding this wrongs?

Thank you.
Daniel


spark + parquet + schema name and metadata

2015-09-21 Thread Borisa Zivkovic
Hi,

I am trying to figure out how to write parquet metadata when persisting
DataFrames to parquet using Spark (1.4.1)

I could not find a way to change schema name (which seems to be hardcoded
to root) and also how to add data to key/value metadata in parquet footer.

org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData

org.apache.parquet.schema.Type#getName

thanks


Re: passing SparkContext as parameter

2015-09-21 Thread Cody Koeninger
That isn't accurate, I think you're confused about foreach.

Look at

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd


On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman  wrote:

> foreach is something that runs on the driver, not the workers.
>
> if you want to perform some function on each record from cassandra, you
> need to do cassandraRdd.map(func), which will run distributed on the spark
> workers
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
> wrote:
>
>> Yes, but i need to read from cassandra db within a spark
>> transformation..something like..
>>
>> dstream.forachRDD{
>>
>> rdd=> rdd.foreach {
>>  message =>
>>  sc.cassandraTable()
>>   .
>>   .
>>   .
>> }
>> }
>>
>> Since rdd.foreach gets executed on workers, how can i make sparkContext
>> available on workers ???
>>
>> Regards,
>> Padma Ch
>>
>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>>
>>> You can use broadcast variable for passing connection information.
>>>
>>> Cheers
>>>
>>> On Sep 21, 2015, at 4:27 AM, Priya Ch 
>>> wrote:
>>>
>>> can i use this sparkContext on executors ??
>>> In my application, i have scenario of reading from db for certain
>>> records in rdd. Hence I need sparkContext to read from DB (cassandra in our
>>> case),
>>>
>>> If sparkContext couldn't be sent to executors , what is the workaround
>>> for this ??
>>>
>>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak 
>>> wrote:
>>>
 add @transient?

 On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
 learnings.chitt...@gmail.com> wrote:

> Hello All,
>
> How can i pass sparkContext as a parameter to a method in an
> object. Because passing sparkContext is giving me TaskNotSerializable
> Exception.
>
> How can i achieve this ?
>
> Thanks,
> Padma Ch
>


>>>
>>
>


Count for select not matching count for group by

2015-09-21 Thread Michael Kelly
Hi,

I'm seeing some strange behaviour with spark 1.5, I have a dataframe
that I have built from loading and joining some hive tables stored in
s3.

The dataframe is cached in memory, using df.cache.

What I'm seeing is that the counts I get when I do a group by on a
column are different from what I get when I filter/select and count.

df.select("outcome").groupBy("outcome").count.show
outcome | count
--
'A'   |  100
'B'   |  200

df.filter("outcome = 'A'").count
# 50

df.filter(df("outcome") === "A").count
# 50

I expect the count of columns that match 'A' in the groupBy to match
the count when filtering. Any ideas what might be happening?

Thanks,

Michael

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Deploying spark-streaming application on production

2015-09-21 Thread Petr Novak
In short there is no direct support for it in Spark AFAIK. You will either
manage it in MQTT or have to add another layer of indirection - either
in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs
files, db) which will keep you unprocessed events.

Now realizing, there is support for backpressure in v1.5.0 but I don't know
if it could be exploited aka I don't know if it is possible to decouple
event reading into memory and actual processing code in Spark which could
be swapped on the fly. Probably not without some custom built facility for
it.

Petr

On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak  wrote:

> I should read my posts at least once to avoid so many typos. Hopefully you
> are brave enough to read through.
>
> Petr
>
> On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak  wrote:
>
>> I think you would have to persist events somehow if you don't want to
>> miss them. I don't see any other option there. Either in MQTT if it is
>> supported there or routing them through Kafka.
>>
>> There is WriteAheadLog in Spark but you would have decouple stream MQTT
>> reading and processing into 2 separate job so that you could upgrade the
>> processing one assuming the reading one would be stable (without changes)
>> across versions. But it is problematic because there is no easy way how to
>> share DStreams between jobs - you would have develop your own facility for
>> it.
>>
>> Alternatively the reading job could could save MQTT event in its the most
>> raw form into files - to limit need to change code - and then the
>> processing job would work on top of it using Spark streaming based on
>> files. I this is inefficient and can get quite complex if you would like to
>> make it reliable.
>>
>> Basically either MQTT supports prsistence (which I don't know) or there
>> is Kafka for these use case.
>>
>> Another option would be I think to place observable streams in between
>> MQTT and Spark streaming with bakcpressure as far as you could perform
>> upgrade till buffers fills up.
>>
>> I'm sorry that it is not thought out well from my side, it is just a
>> brainstorm but it might lead you somewhere.
>>
>> Regards,
>> Petr
>>
>> On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele > > wrote:
>>
>>> Hi All,
>>>
>>> I have an spark streaming application with batch (10 ms) which is
>>> reading the MQTT channel and dumping the data from MQTT to HDFS.
>>>
>>> So suppose if I have to deploy new application jar(with changes in spark
>>> streaming application) what is the best way to deploy, currently I am doing
>>> as below
>>>
>>> 1.killing the running streaming app using yarn application -kill ID
>>> 2. and then starting the application again
>>>
>>> Problem with above approach is since we are not persisting the events in
>>> MQTT we will miss the events for the period of deploy.
>>>
>>> how to handle this case?
>>>
>>> regards
>>> jeeetndra
>>>
>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Petr Novak
add @transient?

On Mon, Sep 21, 2015 at 11:36 AM, Petr Novak  wrote:

> add @transient?
>
> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch 
> wrote:
>
>> Hello All,
>>
>> How can i pass sparkContext as a parameter to a method in an object.
>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>
>> How can i achieve this ?
>>
>> Thanks,
>> Padma Ch
>>
>
>


mongo-hadoop with Spark is slow for me, and adding nodes doesn't seem to make any noticeable difference

2015-09-21 Thread cscarioni
Hi,I appreciate any help or pointers in the right direction

My current test scenario is the following.

I want to process a MongoDB collection, anonymising some fields on it and
store it in another Collection.

The size of the collection is around 900 GB with 2.5 million documents

Following is the code.



object Anonymizer extends SparkRunner {

  val sqlContext = new SQLContext(sc)

  MongoDBLoader(conf, sc,
"output").load(MongoHadoopImplementationReader(conf, sc, "input").rdd,
(dbObject: BSONObject) => {
  dbObject.put("add_field", "John Macclane")
  val embedded = dbObject.get("embedded").asInstanceOf[BasicDBObject]
  embedded.put("business_name", Name.first_name)
  dbObject.put("embedded", webRfq)
  val notesWrapper =
Option(dbObject.get("embedded_list").asInstanceOf[java.util.ArrayList[BasicDBObject]])
  notesWrapper match {
case Some(notes) =>
  notes.foreach((note: BasicDBObject) => {
note.put("text", Name.name)
  })
case None =>
  }
  dbObject
}
  )
}...

And




case class MongoHadoopImplementationReader(conf: com.typesafe.config.Config,
sc: SparkContext, collection: String) {
  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.input.uri",
   
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_database")}.${collection}")
  mongoConfig.set("mongo.input.split_size", "50")
  mongoConfig.set("mongo.input.limit", "70")


  def rdd: RDD[(Object, BSONObject)] = {
val rdd = sc.newAPIHadoopRDD(
  mongoConfig,
  classOf[MongoInputFormat],
  classOf[Object],
  classOf[BSONObject])
rdd
  }

}


And 


case class MongoDBLoader(conf: com.typesafe.config.Config, sc:SparkContext,
collection: String) {

  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.output.uri",
   
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_output_database")}.${collection}")

  def load(rdd: => RDD[(Object, BSONObject)], transformer: (BSONObject) =>
BSONObject) = {

val mongoRDD = rdd.map[(Object, BSONObject)]((tuple: (Object,
BSONObject)) => {
  (null, transformer(tuple._2))
})

mongoRDD.saveAsNewAPIHadoopFile(
  "file:///this-is-completely-unused",
  classOf[Object],
  classOf[BSONObject],
  classOf[MongoOutputFormat[Object, BSONObject]],
  mongoConfig)
  }
}


This code runs slow. Taking 9.5 hours in a 3 machine cluster to process all.
And after 6 hours in a 30 machine cluster I stopped as it was only about
half processed.

The machines are ec2 m3.large instances. The MongoDB lives on another EC2
instance inside the same VPC and same subnet.

I tried to look into the configuration options but it seems that in most
cases the defaults are the way to go (number of cores, memory, etc). 

It looks like I have some bottleneck somewhere, but not sure at all. And I
am thinking Mongo is not able to handle the parallelism? 

How are the RDDs stored in memory?. When I run it, I see I get around 32000
partitions and tasks created. Then it looks to slow down the processing
towards it advance (This can be due to mongo documents being bigger at the
second half of our DB.).

I see as well that the split is stored in HDFS in Spark and then read and
BulkInserted in Mongo. However there is a lot of HDFS space (like 30 gigs
per machine) but just a tiny fraction is used. Wouldn't it be better to fill
this more and only try to insert into mongo when more data is available?. 

I also tried to increase the Split size, but it complains of not enough
resources on the worker. However I don't think the Splits are big enough to
actually fill the 6GB of memory of each node, as when it stores them on HDFS
is a lot less than that.

Is there anything obvious (or not :)) that I am not doing correctly?. Is
this the correct way to transform a collection from Mongo to Mongo?. Is
there another way?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mongo-hadoop-with-Spark-is-slow-for-me-and-adding-nodes-doesn-t-seem-to-make-any-noticeable-differene-tp24754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Druid

2015-09-21 Thread Petr Novak
Great work.

On Fri, Sep 18, 2015 at 6:51 PM, Harish Butani 
wrote:

> Hi,
>
> I have just posted a Blog on this:
> https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani
>
> regards,
> Harish Butani.
>
> On Tue, Sep 1, 2015 at 11:46 PM, Paolo Platter 
> wrote:
>
>> Fantastic!!! I will look into that and I hope to contribute
>>
>> Paolo
>>
>> Inviata dal mio Windows Phone
>> --
>> Da: Harish Butani 
>> Inviato: ‎02/‎09/‎2015 06:04
>> A: user 
>> Oggetto: Spark + Druid
>>
>> Hi,
>>
>> I am working on the Spark Druid Package:
>> https://github.com/SparklineData/spark-druid-olap.
>> For scenarios where a 'raw event' dataset is being indexed in Druid it
>> enables you to write your Logical Plans(queries/dataflows) against the 'raw
>> event' dataset and it rewrites parts of the plan to execute as a Druid
>> Query. In Spark the configuration of a Druid DataSource is somewhat like
>> configuring an OLAP index in a traditional DB. Early results show
>> significant speedup of pushing slice and dice queries to Druid.
>>
>> It comprises of a Druid DataSource that wraps the 'raw event' dataset and
>> has knowledge of the Druid Index; and a DruidPlanner which is a set of plan
>> rewrite strategies to convert Aggregation queries into a Plan having a
>> DruidRDD.
>>
>> Here
>> 
>>  is
>> a detailed design document, which also describes a benchmark of
>> representative queries on the TPCH dataset.
>>
>> Looking for folks who would be willing to try this out and/or contribute.
>>
>> regards,
>> Harish Butani.
>>
>
>


spark with internal ip

2015-09-21 Thread ZhuGe
Hi there:We recently add one NIC to each node of the cluster(stand alone) for 
larger bandwidth, and we modify the /etc/hosts file,  so the hostname points to 
the new NIC's ip address(internal).What we want to achieve is that, 
communication between nodes would go through the new NIC. 
It seems the cluster would start properly. However,  we could not remotely 
access the ui as the ui is bind to the internal ip address.
Any configuration could help me to solve this issue?

Cheers Ge Zhu 

Re: how to get RDD from two different RDDs with cross column

2015-09-21 Thread Romi Kuntsman
Hi,
If I understand correctly:
rdd1 contains keys (of type StringDate)
rdd2 contains keys and values
and rdd3 contains all the keys, and the values from rdd2?

I think you should make rdd1 and rdd2 PairRDD, and then use outer join.
Does that make sense?

On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhu  wrote:

> Dear Romi, Priya, Sujt and Shivaram and all,
>
> I have took lots of days to think into this issue, however, without  any
> enough good solution...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, and another RDD rdd2,
> (rdd2 can be PairRDD, or DataFrame with two columns as ).
> StringDate column values from rdd1 and rdd2 are cross but not the same.
>
> I would like to get a new RDD rdd3, StringDate in rdd3
> would be all from (same) as rdd1, and float in rdd3 would be from rdd2 if
> its
> StringDate is in rdd2, or else NULL would be assigned.
> each row in rdd3[ i ] = ,
> rdd2[i].StringDate would be same as rdd1[ i ].StringDate,
> then rdd2[ i ].float is assigned rdd3[ i ] StringDate part.
> What kinds of API or function would I use...
>
> Thanks very much!
> Zhiliang
>
>
>


Slow Performance with Apache Spark Gradient Boosted Tree training runs

2015-09-21 Thread vkutsenko
I'm experimenting with  Gradient Boosted Trees
   learning
algorithm from ML library of Spark 1.4. I'm solving a binary classification
problem where my input is ~50,000 samples and ~500,000 features. My goal is
to output the definition of the resulting GBT ensemble in human-readable
format. My experience so far is that for my problem size adding more
resources to the cluster seems to not have an effect on the length of the
run. A 10-iteration training run seem to roughly take 13hrs. This isn't
acceptable since I'm looking to do 100-300 iteration runs, and the execution
time seems to explode with the number of iterations.

*My Spark application*
This isn't the exact code, but it can be reduced to:

SparkConf sc = new SparkConf().setAppName("GBT Trainer")
// unlimited max result size for intermediate Map-Reduce
ops.
// Having no limit is probably bad, but I've not had time to
find
// a tighter upper bound and the default value wasn't
sufficient.
.set("spark.driver.maxResultSize", "0");
JavaSparkContext jsc = new JavaSparkContext(sc)

// The input file is encoded in plain-text LIBSVM format ~59GB in size
 data = MLUtils.loadLibSVMFile(jsc.sc(),
"s3://somebucket/somekey/plaintext_libsvm_file").toJavaRDD();

BoostingStrategy boostingStrategy =
BoostingStrategy.defaultParams("Classification");
boostingStrategy.setNumIterations(10);
boostingStrategy.getTreeStrategy().setNumClasses(2);
boostingStrategy.getTreeStrategy().setMaxDepth(1);
Map categoricalFeaturesInfo = new HashMap();
   
boostingStrategy.treeStrategy().setCategoricalFeaturesInfo(categoricalFeaturesInfo);

GradientBoostedTreesModel model = GradientBoostedTrees.train(data,
boostingStrategy);

// Somewhat-convoluted code below reads in Parquete-formatted output
// of the GBT model and writes it back out as json.
// There might be cleaner ways of achieving the same, but since output
// size is only a few KB I feel little guilt leaving it as is.

// serialize and output the GBT classifier model the only way that the
library allows
String outputPath = "s3://somebucket/somekeyprefex";
model.save(jsc.sc(), outputPath + "/parquet");
// read in the parquet-formatted classifier output as a generic
DataFrame object
SQLContext sqlContext = new SQLContext(jsc);
DataFrame outputDataFrame = sqlContext.read().parquet(outputPath +
"/parquet"));
// output DataFrame-formatted classifier model as json   
outputDataFrame.write().format("json").save(outputPath + "/json");

*Question*
What is the performance bottleneck with my Spark application (or with GBT
learning algorithm itself) on input of that size and how can I achieve
greater execution parallelism?

I'm still a novice Spark dev, and I'd appreciate any tips on cluster
configuration and execution profiling. 


*More details on the cluster setup*

I'm running this app on a AWS EMR cluster (emr-4.0.0, YARN cluster mode) of
r3.8xlarge instances (32 cores, 244GB RAM each). I'm using such large
instances in order to maximize flexibility of resource allocation. So far
I've tried using 1-3 r3.8xlarge instances with a variety of resource
allocation schemes between the driver and workers. For example, for a
cluster of 1 r3.8xlarge instances I submit the app as follows:

aws emr add-steps --cluster-id $1 --steps Name=$2,\
   
Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,\
Args=[/usr/lib/spark/bin/spark-submit,--verbose,\
--deploy-mode,cluster,--master,yarn,\
--driver-memory,60G,\
--executor-memory,30G,\
--executor-cores,5,\
--num-executors,6,\
--class,GbtTrainer,\
"s3://somebucket/somekey/spark.jar"],\
ActionOnFailure=CONTINUE

For a cluster of 3 r3.8xlarge instances I tweak resource allocation:

--driver-memory,80G,\
--executor-memory,35G,\
--executor-cores,5,\
--num-executors,18,\

I don't have a clear idea of how much memory is useful to give to every
executor, but I feel that I'm being generous in either case. Looking through
Spark UI, I'm not seeing task with input size of more than a few GB. I'm
steering on the side of caution when giving the driver process so much
memory in order to ensure that it isn't memory starved for any intermediate
result-aggregation operations.

I'm trying to keep the number of cores per executor down to 5 as per
suggestions in  Clouderas How To Tune Your Spark Jobs series

  
(according to them, more that 5 cores tends to introduce a HDFS IO
bottleneck). I'm also making sure that there is enough of spare RAM and CPUs
left over for the host OS and Hadoop services.

*My findings thus far*
My only clue is Spark UI showing very long Scheduling Delay for a number of

Re: Null Value in DecimalType column of DataFrame

2015-09-21 Thread Reynold Xin
+dev list

Hi Dirceu,

The answer to whether throwing an exception is better or null is better
depends on your use case. If you are debugging and want to find bugs with
your program, you might prefer throwing an exception. However, if you are
running on a large real-world dataset (i.e. data is dirty) and your query
can take a while (e.g. 30 mins), you then might prefer the system to just
assign null values to the dirty data that could lead to runtime exceptions,
because otherwise you could be spending days just to clean your data.

Postgres throws exceptions here, but I think that's mainly because it is
used for OLTP, and in those cases queries are short-running. Most other
analytic databases I believe just return null. The best we can do is to
provide a config option to indicate behavior for exception handling.


On Fri, Sep 18, 2015 at 8:15 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Yin, I got that part.
> I just think that instead of returning null, throwing an exception would
> be better. In the exception message we can explain that the DecimalType
> used can't fit the number that is been converted due to the precision and
> scale values used to create it.
> It would be easier for the user to find the reason why that error is
> happening, instead of receiving an NullPointerException in another part of
> his code. We can also make a better documentation of DecimalType classes to
> explain this behavior, what do you think?
>
>
>
>
> 2015-09-17 18:52 GMT-03:00 Yin Huai :
>
>> As I mentioned before, the range of values of DecimalType(10, 10) is [0,
>> 1). If you have a value 10.5 and you want to cast it to DecimalType(10,
>> 10), I do not think there is any better returned value except of null.
>> Looks like DecimalType(10, 10) is not the right type for your use case. You
>> need a decimal type that has precision - scale >= 2.
>>
>> On Tue, Sep 15, 2015 at 6:39 AM, Dirceu Semighini Filho <
>> dirceu.semigh...@gmail.com> wrote:
>>
>>>
>>> Hi Yin, posted here because I think it's a bug.
>>> So, it will return null and I can get a nullpointerexception, as I was
>>> getting. Is this really the expected behavior? Never seen something
>>> returning null in other Scala tools that I used.
>>>
>>> Regards,
>>>
>>>
>>> 2015-09-14 18:54 GMT-03:00 Yin Huai :
>>>
 btw, move it to user list.

 On Mon, Sep 14, 2015 at 2:54 PM, Yin Huai  wrote:

> A scale of 10 means that there are 10 digits at the right of the
> decimal point. If you also have precision 10, the range of your data will
> be [0, 1) and casting "10.5" to DecimalType(10, 10) will return null, 
> which
> is expected.
>
> On Mon, Sep 14, 2015 at 1:42 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi all,
>> I'm moving from spark 1.4 to 1.5, and one of my tests is failing.
>> It seems that there was some changes in org.apache.spark.sql.types.
>> DecimalType
>>
>> This ugly code is a little sample to reproduce the error, don't use
>> it into your project.
>>
>> test("spark test") {
>>   val file = 
>> context.sparkContext().textFile(s"${defaultFilePath}Test.csv").map(f => 
>> Row.fromSeq({
>> val values = f.split(",")
>> 
>> Seq(values.head.toString.toInt,values.tail.head.toString.toInt,BigDecimal(values.tail.tail.head),
>> values.tail.tail.tail.head)}))
>>
>>   val structType = StructType(Seq(StructField("id", IntegerType, false),
>> StructField("int2", IntegerType, false), StructField("double",
>>
>>  DecimalType(10,10), false),
>>
>>
>> StructField("str2", StringType, false)))
>>
>>   val df = context.sqlContext.createDataFrame(file,structType)
>>   df.first
>> }
>>
>> The content of the file is:
>>
>> 1,5,10.5,va
>> 2,1,0.1,vb
>> 3,8,10.0,vc
>>
>> The problem resides in DecimalType, before 1.5 the scala wasn't
>> required. Now when using  DecimalType(12,10) it works fine, but
>> using DecimalType(10,10) the Decimal values
>> 10.5 became null, and the 0.1 works.
>>
>> Is there anybody working with DecimalType for 1.5.1?
>>
>> Regards,
>> Dirceu
>>
>>
>

>>>
>>>
>>
>


HiveQL Compatibility (0.12.0, 0.13.0???)

2015-09-21 Thread Dominic Ricard
Hi,
   here's a statement from the Spark 1.5.0  Spark SQL and DataFrame Guide

 
:

*Compatibility with Apache Hive*
Spark SQL is designed to be compatible with the Hive Metastore, SerDes and
UDFs. Currently Spark SQL is based on Hive 0.12.0 and 0.13.1.

After testing many functions available in 1.1.0 and 1.2.0, I tend to think
that this is no longer true...

Could someone update the documentation or tell me what these versions refer
to as it appears that Spark SQL 1.5.0 support everything in Hive 1.2.0...

Thank you. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveQL-Compatibility-0-12-0-0-13-0-tp24757.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why are executors on slave never used?

2015-09-21 Thread Andrew Or
Hi Joshua,

What cluster manager are you using, standalone or YARN? (Note that
standalone here does not mean local mode).

If standalone, you need to do `setMaster("spark://[CLUSTER_URL]:7077")`,
where CLUSTER_URL is the machine that started the standalone Master. If
YARN, you need to do `setMaster("yarn")`, assuming that all the Hadoop
configurations files such as core-site.xml are already set up properly.

-Andrew


2015-09-21 8:53 GMT-07:00 Hemant Bhanawat :

> When you specify master as local[2], it starts the spark components in a
> single jvm. You need to specify the master correctly.
> I have a default AWS EMR cluster (1 master, 1 slave) with Spark. When I
> run a Spark process, it works fine -- but only on the master, as if it were
> standalone.
>
> The web-UI and logging code shows only 1 executor, the localhost.
>
> How can I diagnose this?
>
> (I create *SparkConf, *in Python, with *setMaster('local[2]'). )*
>
> (Strangely, though I don't think that this causes the problem, there is
> almost nothing spark-related on the slave machine:* /usr/lib/spark *has a
> few jars, but that's it:  *datanucleus-api-jdo.jar  datanucleus-core.jar
>  datanucleus-rdbms.jar  spark-yarn-shuffle.jar. *But this is an AWS EMR
> cluster as created by* create-cluster*, so I would assume that the slave
> and master are configured OK out-of the box.)
>
> Joshua
>


Re: Spark data type guesser UDAF

2015-09-21 Thread Ruslan Dautkhanov
Does it deserve to be a JIRA in Spark / Spark MLLib?
How do you guys normally determine data types?

Frameworks like h2o automatically determine data type scanning a sample of
data, or whole dataset.
So then one can decide e.g. if a variable should be a categorical variable
or numerical.

Another use case is if you get an arbitrary data set (we get them quite
often), and want to save as a Parquet table.
Providing correct data types make parquet more space effiecient (and
probably more query-time performant, e.g.
better parquet bloom filters than just storing everything as
string/varchar).



-- 
Ruslan Dautkhanov

On Thu, Sep 17, 2015 at 12:32 PM, Ruslan Dautkhanov 
wrote:

> Wanted to take something like this
>
> https://github.com/fitzscott/AirQuality/blob/master/HiveDataTypeGuesser.java
> and create a Hive UDAF to create an aggregate function that returns a data
> type guess.
> Am I inventing a wheel?
> Does Spark have something like this already built-in?
> Would be very useful for new wide datasets to explore data. Would be
> helpful for ML too,
> e.g. to decide categorical vs numerical variables.
>
>
> Ruslan
>
>


Re: Exception initializing JavaSparkContext

2015-09-21 Thread Ellen Kraffmiller
I found the problem - the pom.xml I was using also contained and old
dependency to a mahout library, which was including the old hadoop-core.
Removing that fixed the problem.
Thank you!

On Mon, Sep 21, 2015 at 2:54 PM, Ted Yu  wrote:

> bq. hadoop-core-0.20.204.0
>
> How come the above got into play - it was from hadoop-1
>
> On Mon, Sep 21, 2015 at 11:34 AM, Ellen Kraffmiller <
> ellen.kraffmil...@gmail.com> wrote:
>
>> I am including the Spark core dependency in my maven pom.xml:
>>
>> 
>> org.apache.spark
>> spark-core_2.10
>> 1.5.0
>> 
>>
>> This is bringing these hadoop versions:
>> hadoop-annotations-2.2.0
>> hadoop-auth-2.2.0
>> hadoop-client-2.2.0
>> hadoop-common-2.2.0
>> hadoop-core-0.20.204.0
>> hadoop-hdfs-2.2.0
>> followed by mapreduce and yarn dependencies... let me know if you need
>> the full list.
>> Thanks,
>> Ellen
>>
>>
>> On Mon, Sep 21, 2015 at 1:48 PM, Marcelo Vanzin 
>> wrote:
>>
>>> What Spark package are you using? In particular, which hadoop version?
>>>
>>> On Mon, Sep 21, 2015 at 9:14 AM, ekraffmiller
>>>  wrote:
>>> > Hi,
>>> > I’m trying to run a simple test program to access Spark though Java.
>>> I’m
>>> > using JDK 1.8, and Spark 1.5.  I’m getting an Exception from the
>>> > JavaSparkContext constructor.  My initialization code matches all the
>>> sample
>>> > code I’ve found online, so not sure what I’m doing wrong.
>>> >
>>> > Here is my code:
>>> >
>>> > SparkConf conf = new SparkConf().setAppName("Simple Application");
>>> > conf.setMaster("local");
>>> > conf.setAppName("my app");
>>> > JavaSparkContext sc = new JavaSparkContext(conf);
>>> >
>>> > The stack trace of the Exception:
>>> >
>>> > java.lang.ExceptionInInitializerError: null
>>> > at java.lang.Class.getField(Class.java:1690)
>>> > at
>>> >
>>> org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:220)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:189)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala:58)
>>> > at
>>> >
>>> org.apache.spark.util.ShutdownHookManager$.(ShutdownHookManager.scala)
>>> > at
>>> >
>>> org.apache.spark.storage.DiskBlockManager.addShutdownHook(DiskBlockManager.scala:147)
>>> > at
>>> >
>>> org.apache.spark.storage.DiskBlockManager.(DiskBlockManager.scala:54)
>>> > at
>>> org.apache.spark.storage.BlockManager.(BlockManager.scala:75)
>>> > at
>>> org.apache.spark.storage.BlockManager.(BlockManager.scala:173)
>>> > at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
>>> > at
>>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
>>> > at
>>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
>>> > at org.apache.spark.SparkContext.(SparkContext.scala:441)
>>> > at
>>> >
>>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
>>> > at
>>> >
>>> edu.harvard.iq.text.core.spark.SparkControllerTest.testMongoRDD(SparkControllerTest.java:63)
>>> >
>>> > Thanks,
>>> > Ellen
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Exception-initializing-JavaSparkContext-tp24755.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>>
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
Cody, that's a great reference!
As shown there - the best way to connect to an external database from the
workers is to create a connection pool on (each) worker.
The driver mass pass, via broadcast, the connection string, but not the
connect object itself and not the spark context.

On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger  wrote:

> That isn't accurate, I think you're confused about foreach.
>
> Look at
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
>
> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman  wrote:
>
>> foreach is something that runs on the driver, not the workers.
>>
>> if you want to perform some function on each record from cassandra, you
>> need to do cassandraRdd.map(func), which will run distributed on the spark
>> workers
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch 
>> wrote:
>>
>>> Yes, but i need to read from cassandra db within a spark
>>> transformation..something like..
>>>
>>> dstream.forachRDD{
>>>
>>> rdd=> rdd.foreach {
>>>  message =>
>>>  sc.cassandraTable()
>>>   .
>>>   .
>>>   .
>>> }
>>> }
>>>
>>> Since rdd.foreach gets executed on workers, how can i make sparkContext
>>> available on workers ???
>>>
>>> Regards,
>>> Padma Ch
>>>
>>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu  wrote:
>>>
 You can use broadcast variable for passing connection information.

 Cheers

 On Sep 21, 2015, at 4:27 AM, Priya Ch 
 wrote:

 can i use this sparkContext on executors ??
 In my application, i have scenario of reading from db for certain
 records in rdd. Hence I need sparkContext to read from DB (cassandra in our
 case),

 If sparkContext couldn't be sent to executors , what is the workaround
 for this ??

 On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak 
 wrote:

> add @transient?
>
> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
> learnings.chitt...@gmail.com> wrote:
>
>> Hello All,
>>
>> How can i pass sparkContext as a parameter to a method in an
>> object. Because passing sparkContext is giving me TaskNotSerializable
>> Exception.
>>
>> How can i achieve this ?
>>
>> Thanks,
>> Padma Ch
>>
>
>

>>>
>>
>


Re: Using Spark for portfolio manager app

2015-09-21 Thread Adrian Tanase
  1.  reading from kafka has exactly once guarantees - we are using it in 
production today (with the direct receiver)
 *   ​you will probably have 2 topics, loading both into spark and joining 
/ unioning as needed is not an issue
 *   tons of optimizations you can do there, assuming everything else works
  2.  ​for ad-hoc query I would say you absolutely need to look at external 
storage
 *   ​querying the Dstream or spark's RDD's directly should be done mostly 
for aggregates/metrics, not by users
 *   if you look at HBase or Cassandra for storage then 50k writes /sec are 
not a problem at all, especially combined with a smart client that does batch 
puts (like async hbase)
 *   you could also consider writing the updates to another kafka topic and 
have  a different component that updates the DB, if you think of other 
optimisations there
  3.  ​by stats I assume you mean metrics (operational or business)
 *   ​there are multiple ways to do this, however I would not encourage you 
to query spark directly, especially if you need an archive/history of your 
datapoints
 *   we are using OpenTSDB (we already have a HBase cluster) + Grafana for 
dashboarding
 *   collecting the metrics is a bit hairy in a streaming app - we have 
experimented with both accumulators and RDDs specific for metrics - chose the 
RDDs that write to OpenTSDB using foreachRdd

​-adrian


From: Thúy Hằng Lê 
Sent: Sunday, September 20, 2015 7:26 AM
To: Jörn Franke
Cc: user@spark.apache.org
Subject: Re: Using Spark for portfolio manager app

Thanks Adrian and Jorn for the answers.

Yes, you're right there are lot of things I need to consider if I want to use 
Spark for my app.

I still have few concerns/questions from your information:

1/ I need to combine trading stream with tick stream, I am planning to use 
Kafka for that
If I am using approach #2 (Direct Approach) in this tutorial 
https://spark.apache.org/docs/latest/streaming-kafka-integration.html
[https://spark.apache.org/docs/latest/img/spark-logo-hd.png]

Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ...
Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe 
messaging rethought as a distributed, partitioned, replicated commit log 
service.
Read 
more...


Will I receive exactly one semantics? Or I have to add some logic in my code to 
archive that.
As your suggestion of using delta update, exactly one semantic is required for 
this application.

2/ For ad-hoc query, I must output of Spark to external storage and query on 
that right?
Is there any way to do ah-hoc query on Spark? my application could have 50k 
updates per second at pick time.
Persistent to external storage lead to high latency in my app.

3/ How to get real-time statistics from Spark,
In  most of the Spark streaming examples, the statistics are echo to the stdout.
However, I want to display those statics on GUI, is there any way to retrieve 
data from Spark directly without using external Storage?


2015-09-19 16:23 GMT+07:00 Jörn Franke 
>:

If you want to be able to let your users query their portfolio then you may 
want to think about storing the current state of the portfolios in 
hbase/phoenix or alternatively a cluster of relationaldatabases can make sense. 
For the rest you may use Spark.

Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê 
> a écrit :
Hi all,

I am going to build a financial application for Portfolio Manager, where each 
portfolio contains a list of stocks, the number of shares purchased, and the 
purchase price.
Another source of information is stocks price from market data. The application 
need to calculate real-time gain or lost of each stock in each portfolio ( 
compared to the purchase price).

I am new with Spark, i know using Spark Streaming I can aggregate portfolio 
possitions in real-time, for example:
user A contains:
  - 100 IBM stock with transactionValue=$15000
  - 500 AAPL stock with transactionValue=$11400

Now given the stock prices change in real-time too, e.g if IBM price at 151, i 
want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - 15000 = $100

My questions are:

 * What is the best method to combine 2 real-time streams( transaction 
made by user and market pricing data) in Spark.
 * How can I use real-time Adhoc SQL again portfolio's positions, is 
there any way i can do SQL on the output of Spark Streamming.
 For example,
  select sum(gainOrLost) from portfolio where user='A';
 * What are prefered external storages for Spark in this use case.
 * Is 

Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

2015-09-21 Thread Adrian Tanase
We do - using Spark streaming, Kafka, HDFS all collocated on the same nodes. 
Works great so far.


Spark picks up the location information and reads data from the partitions 
hosted by the local broker, showing up as NODE_LOCAL in the UI.

You also need to look at the locality options in the config 
(spark.locality.waitand friends) - just to make sure you're not wasting time if 
the kafka cluster becomes unbalanced and there are fewer cores than partitions 
on a particular node - you want to get to RACK_LOCAL as quickly as possible, 
we've set this to 500 milis instead of the default of 3 seconds.

-adrian


From: Cody Koeninger 
Sent: Monday, September 21, 2015 10:19 PM
To: Ashish Soni
Cc: user
Subject: Re: Spark Streaming and Kafka MultiNode Setup - Data Locality

The direct stream already uses the kafka leader for a given partition as the 
preferred location.

I don't run kafka on the same nodes as spark, and I don't know anyone who does, 
so that situation isn't particularly well tested.

On Mon, Sep 21, 2015 at 1:15 PM, Ashish Soni 
> wrote:
Hi All ,

Just wanted to find out if there is an benefits to installing  kafka brokers 
and spark nodes on the same machine ?

is it possible that spark can pull data from kafka if it is local to the node 
i.e. the broker or partition is on the same machine.

Thanks,
Ashish



Re: What's the best practice to parse JSON using spark

2015-09-21 Thread Adrian Tanase
I've been using spray-json for general 
JSON ser/deser in scala (spark app), mostly for config files and data exchange. 
Haven't used it in conjunction with jobs that process large JSON data sources, 
so can't speak for those use cases.


-adrian



From: Petr Novak 
Sent: Monday, September 21, 2015 12:11 PM
To: Cui Lin; user
Subject: Re: What's the best practice to parse JSON using spark

Surprisingly I had the same issue when including json4s dependency at the same 
version v3.2.10. I had to remove json4s deps from my code. I'm using Scala 
2.11, there might be some issue with mixing 2.10/2.11 and it could be just my 
environment. I haven't investigated much as depending on Spark provided version 
is fine for us for now.

Regards,
Petr

On Mon, Sep 21, 2015 at 11:06 AM, Petr Novak 
> wrote:
Internally Spark is using json4s and jackson parser v3.2.10, AFAIK. So if you 
are using Scala they should be available without adding dependencies. There is 
v3.2.11 already available but adding to my app was causing NoSuchMethod 
exception so I would have to shade it. I'm simply staying on v3.2.10 for now.

Regards,
Petr

On Sat, Sep 19, 2015 at 2:45 AM, Ted Yu 
> wrote:
For #1, see this thread: http://search-hadoop.com/m/q3RTti0Thneenne2

For #2, also see:
examples//src/main/python/hbase_inputformat.py
examples//src/main/python/hbase_outputformat.py

Cheers

On Fri, Sep 18, 2015 at 5:12 PM, Ted Yu 
> wrote:
For #2, please see:

examples/src/main/scala//org/apache/spark/examples/HBaseTest.scala
examples/src/main/scala//org/apache/spark/examples/pythonconverters/HBaseConverters.scala

In hbase, there is hbase-spark module which is being polished. Should be 
available in hbase 1.3.0 release.

Cheers

On Fri, Sep 18, 2015 at 5:09 PM, Cui Lin 
> wrote:
Hello,All,

Parsing JSON's nested structure is easy if using Java or Python API. Where I 
can find the similar way to parse JSON file using spark?

Another question is by using SparkSQL, how can i easily save the results into 
NOSQL DB? any examples? Thanks a lot!



--
Best regards!

Lin,Cui






Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-21 Thread Alan Braithwaite
That could be the behavior but spark.mesos.executor.home being unset still
raises an exception inside the dispatcher preventing a docker from even
being started.  I can see if other properties are inherited from the
default environment when that's set, if you'd like.

I think the main problem is just that premature validation is being done on
the dispatcher and the dispatcher crashing in the event of bad config.

- Alan

On Sat, Sep 19, 2015 at 11:03 AM, Timothy Chen  wrote:

> You can still provide properties through the docker container by putting
> configuration in the conf directory, but we try to pass all properties
> submitted from the driver spark-submit through which I believe will
> override the defaults.
>
> This is not what you are seeing?
>
> Tim
>
>
> On Sep 19, 2015, at 9:01 AM, Alan Braithwaite  wrote:
>
> The assumption that the executor has no default properties set in it's
> environment through the docker container.  Correct me if I'm wrong, but any
> properties which are unset in the SparkContext will come from the
> environment of the executor will it not?
>
> Thanks,
> - Alan
>
> On Sat, Sep 19, 2015 at 1:09 AM, Tim Chen  wrote:
>
>> I guess I need a bit more clarification, what kind of assumptions was the
>> dispatcher making?
>>
>> Tim
>>
>>
>> On Thu, Sep 17, 2015 at 10:18 PM, Alan Braithwaite 
>> wrote:
>>
>>> Hi Tim,
>>>
>>> Thanks for the follow up.  It's not so much that I expect the executor
>>> to inherit the configuration of the dispatcher as I* don't *expect the
>>> dispatcher to make assumptions about the system environment of the executor
>>> (since it lives in a docker).  I could potentially see a case where you
>>> might want to explicitly forbid the defaults, but I can't think of any
>>> right now.
>>>
>>> Otherwise, I'm confused as to why the defaults in the docker image for
>>> the executor are just ignored.  I suppose that it's the dispatchers job to
>>> ensure the *exact* configuration of the executor, regardless of the
>>> defaults set on the executors machine?  Is that the assumption being made?
>>> I can understand that in contexts which aren't docker driven since jobs
>>> could be rolling out in the middle of a config update.  Trying to think of
>>> this outside the terms of just mesos/docker (since I'm fully aware that
>>> docker doesn't rule the world yet).
>>>
>>> So I can see this from both perspectives now and passing in the
>>> properties file will probably work just fine for me, but for my better
>>> understanding: When the executor starts, will it read any of the
>>> environment that it's executing in or will it just take only the properties
>>> given to it by the dispatcher and nothing more?
>>>
>>> Lemme know if anything needs more clarification and thanks for your
>>> mesos contribution to spark!
>>>
>>> - Alan
>>>
>>> On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen  wrote:
>>>
 Hi Alan,

 If I understand correctly, you are setting executor home when you
 launch the dispatcher and not on the configuration when you submit job, and
 expect it to inherit that configuration?

 When I worked on the dispatcher I was assuming all configuration is
 passed to the dispatcher to launch the job exactly how you will need to
 launch it with client mode.

 But indeed it shouldn't crash dispatcher, I'll take a closer look when
 I get a chance.

 Can you recommend changes on the documentation, either in email or a PR?

 Thanks!

 Tim

 Sent from my iPhone

 On Sep 17, 2015, at 12:29 PM, Alan Braithwaite 
 wrote:

 Hey All,

 To bump this thread once again, I'm having some trouble using the
 dispatcher as well.

 I'm using Mesos Cluster Manager with Docker Executors.  I've deployed
 the dispatcher as Marathon job.  When I submit a job using spark submit,
 the dispatcher writes back that the submission was successful and then
 promptly dies in marathon.  Looking at the logs reveals it was hitting the
 following line:

 398:  throw new SparkException("Executor Spark home
 `spark.mesos.executor.home` is not set!")

 Which is odd because it's set in multiple places (SPARK_HOME,
 spark.mesos.executor.home, spark.home, etc).  Reading the code, it
 appears that the driver desc pulls only from the request and disregards any
 other properties that may be configured.  Testing by passing --conf
 spark.mesos.executor.home=/usr/local/spark on the command line to
 spark-submit confirms this.  We're trying to isolate the number of places
 where we have to set properties within spark and were hoping that it will
 be possible to have this pull in the spark-defaults.conf from somewhere, or
 at least allow the user to inform the dispatcher through spark-submit that
 those 

Serialization Error with PartialFunction / immutable sets

2015-09-21 Thread Chaney Courtney
Hi, I’m receiving a task not serializable exception using Spark GraphX (Scala 
2.11.6 / JDK 1.8 / Spark 1.5)

My vertex data is of type (VertexId, immutable Set), 
My edge data is of type PartialFunction[ISet[E], ISet[E]] where each ED has a 
precomputed function.

My vertex program: 
val vertexProgram = (id: VertexId, currentSet: ISet[E], inSet: ISet[E]) 
=> inSet (identity)
My send message:
val sendMessage: (EdgeTriplet[ISet[E], MonotonicTransferFunction]) => 
Iterator[(VertexId, ISet[E])] =
(edge) => {
val f = edge.attr
val currentSet = edge.srcAttr
Iterator((edge.dstId, f(currentSet)))
}
My message combiner
val messageCombiner: (ISet[E], ISet[E]) => ISet[E] = 
(a, b) => a ++ b

g.pregel(bottom, Int.MaxValue, EdgeDirection.Out)(vp, send, combine)

I debugged the pregel execution and found that the exception happened when 
pregel calls mapReduceTriplets to aggregate the messages for the first time. 
This happens after the initial vertex program is run I believe (which does not 
cause an exception). I think the error lies within my send/combiner functions 
but I am not sure. I’ve also tried storing the PartialFunctions inside of the 
VD instead and still get the same error. At first I thought the error might 
have to do with Set and how it changes size throughout execution, but I have 
successfully ran other Pregel projects using immutable sets without issue…

I have also tried enclosing each method within its own class that extends 
Serializable but this still gives me the same error.

Thank you for your time and information.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Deploying spark-streaming application on production

2015-09-21 Thread Adrian Tanase
I'm wondering, isn't this the canonical use case for WAL + reliable receiver?

As far as I know you can tune Mqtt server to wait for ack on messages (qos 
level 2?).
With some support from the client libray you could achieve exactly once 
semantics on the read side, if you ack message only after writing it to WAL, 
correct?

-adrian

Sent from my iPhone

On 21 Sep 2015, at 12:35, Petr Novak 
> wrote:

In short there is no direct support for it in Spark AFAIK. You will either 
manage it in MQTT or have to add another layer of indirection - either 
in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs 
files, db) which will keep you unprocessed events.

Now realizing, there is support for backpressure in v1.5.0 but I don't know if 
it could be exploited aka I don't know if it is possible to decouple event 
reading into memory and actual processing code in Spark which could be swapped 
on the fly. Probably not without some custom built facility for it.

Petr

On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 
> wrote:
I should read my posts at least once to avoid so many typos. Hopefully you are 
brave enough to read through.

Petr

On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
> wrote:
I think you would have to persist events somehow if you don't want to miss 
them. I don't see any other option there. Either in MQTT if it is supported 
there or routing them through Kafka.

There is WriteAheadLog in Spark but you would have decouple stream MQTT reading 
and processing into 2 separate job so that you could upgrade the processing one 
assuming the reading one would be stable (without changes) across versions. But 
it is problematic because there is no easy way how to share DStreams between 
jobs - you would have develop your own facility for it.

Alternatively the reading job could could save MQTT event in its the most raw 
form into files - to limit need to change code - and then the processing job 
would work on top of it using Spark streaming based on files. I this is 
inefficient and can get quite complex if you would like to make it reliable.

Basically either MQTT supports prsistence (which I don't know) or there is 
Kafka for these use case.

Another option would be I think to place observable streams in between MQTT and 
Spark streaming with bakcpressure as far as you could perform upgrade till 
buffers fills up.

I'm sorry that it is not thought out well from my side, it is just a brainstorm 
but it might lead you somewhere.

Regards,
Petr

On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele 
> wrote:
Hi All,

I have an spark streaming application with batch (10 ms) which is reading the 
MQTT channel and dumping the data from MQTT to HDFS.

So suppose if I have to deploy new application jar(with changes in spark 
streaming application) what is the best way to deploy, currently I am doing as 
below

1.killing the running streaming app using yarn application -kill ID
2. and then starting the application again

Problem with above approach is since we are not persisting the events in MQTT 
we will miss the events for the period of deploy.

how to handle this case?

regards
jeeetndra





  1   2   >