Spark Configuration of spark.worker.cleanup.appDataTtl

2015-06-15 Thread luohui20001
Hi guys:   I added a parameter "spark.worker.cleanup.appDataTtl   3 * 24 * 
3600" in my conf/spark-default.conf, then I start my spark cluster. However I 
got an exception:
15/06/16 14:25:14 INFO util.Utils: Successfully started service 'sparkWorker' 
on port 43344.
15/06/16 14:25:14 ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
at akka.actor.ActorCell.create(ActorCell.scala:596)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at akka.util.Reflect$.instantiate(Reflect.scala:66)
at akka.actor.ArgsReflectConstructor.produce(Props.scala:352)
at akka.actor.Props.newActor(Props.scala:252)
at akka.actor.ActorCell.newActor(ActorCell.scala:552)
at akka.actor.ActorCell.create(ActorCell.scala:578)
... 9 more
Caused by: java.lang.NumberFormatException: For input string: "3 * 24 * 3600"
at 
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at 
scala.collection.immutable.StringLike$class.toLong(StringLike.scala:230)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:31)
at 
org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:194)
at 
org.apache.spark.SparkConf$$anonfun$getLong$2.apply(SparkConf.scala:194)
at scala.Option.map(Option.scala:145)
at org.apache.spark.SparkConf.getLong(SparkConf.scala:194)
at org.apache.spark.deploy.worker.Worker.(Worker.scala:89)
... 18 more

How to set this parameter correctly?   
BTW, I searched this property in 
http://spark.apache.org/docs/latest/configuration.html and got no match. This 
property was found in http://spark.apache.org/docs/latest/spark-standalone.html 
with a default value "7 * 24 * 3600 (7 days)", which I also tried but also 
failed.Thanks


 

Thanks&Best regards!
San.Luo


Re: What are the likely causes of org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle?

2015-06-15 Thread Jia Yu
Hi Peng,

I got exactly same error! My shuffle data is also very large. Have you
figured out a method to solve that?

Thanks,
Jia

On Fri, Apr 24, 2015 at 7:59 AM, Peng Cheng  wrote:

> I'm deploying a Spark data processing job on an EC2 cluster, the job is
> small
> for the cluster (16 cores with 120G RAM in total), the largest RDD has only
> 76k+ rows. But heavily skewed in the middle (thus requires repartitioning)
> and each row has around 100k of data after serialization. The job always
> got
> stuck in repartitioning. Namely, the job will constantly get following
> errors and retries:
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle
>
> org.apache.spark.shuffle.FetchFailedException: Error in opening
> FileSegmentManagedBuffer
>
> org.apache.spark.shuffle.FetchFailedException:
> java.io.FileNotFoundException: /tmp/spark-...
> I've tried to identify the problem but it seems like both memory and disk
> consumption of the machine throwing these errors are below 50%. I've also
> tried different configurations, including:
>
> let driver/executor memory use 60% of total memory.
> let netty to priortize JVM shuffling buffer.
> increase shuffling streaming buffer to 128m.
> use KryoSerializer and max out all buffers
> increase shuffling memoryFraction to 0.4
> But none of them works. The small job always trigger the same series of
> errors and max out retries (upt to 1000 times). How to troubleshoot this
> thing in such situation?
>
> Thanks a lot if you have any clue.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-are-the-likely-causes-of-org-apache-spark-shuffle-MetadataFetchFailedException-Missing-an-outpu-tp22646.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: RDD of Iterable[String]

2015-06-15 Thread nir
Have you found answer to this? I am also looking for exact same solution.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-of-Iterable-String-tp15016p23329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Help!!!Map or join one large datasets then suddenly remote Akka client disassociated

2015-06-15 Thread Jia Yu
Hi folks,

Help me! I met a very weird problem. I really need some help!! Here is my
situation:

Case: Assign keys to two datasets (one is 96GB with 2.7 billion records and
one 1.5GB with 30k records) via MapPartitions first, and join them together
with their keys.

Environment:

Standalone Spark on Amazon EC2
Master*1 13GB 8 cores
Worker*16  each one 13GB 8 cores


(After met this problem, I switched to
Worker*16  each one 59GB 8 cores)


Read and write on HDFS (same cluster)
--
Problem:

At the beginning:---

The MapPartitions looks no problem. But when Spark does the Join for two
datasets, the console says

*"ERROR TaskSchedulerImpl: Lost executor 4 on
ip-172-31-27-174.us-west-2.compute.internal: remote Akka client
disassociated"*

Then I go back to this worker and check its log

There is something like "Master said remote Akka client disassociated and
asked to kill executor *** and then the worker killed this executor"

(Sorry I deleted that log and just remember the content.)

There is no other errors before the Akka client disassociated (for both of
master and worker).

Then ---

I tried one 62GB dataset with the 1.5 GB dataset. My job worked
smoothly. *HOWEVER,
I found one thing: If I set the spark.shuffle.memoryFraction to Zero, same
error will happen on this 62GB dataset.*

Then ---

I switched my workers to Worker*16  each one 59GB 8 cores. Error even
happened when Spark does the MapPartitions

Some metrics I
found

*When I do the MapPartitions or Join with 96GB data, its shuffle write is
around 100GB. And I cached 96GB data and its size is around 530GB.*

*Garbage collection time for 96GB dataset when Spark does the Map or Join
is around 12 second.*

My analysis--

This problem might be caused by large shuffle write data. The large shuffle
write caused high I/O on disk. If the shuffle write cannot be done by some
timeout period, then the master will think this executor is disassociated.

But I don't know how to solve this problem.

---


Any help will be appreciated!!!

Thanks,
Jia


Re: Spark DataFrame 1.4 write to parquet/saveAsTable tasks fail

2015-06-15 Thread Night Wolf
Hey Yin,

Thanks for the link to the JIRA. I'll add details to it. But I'm able to
reproduce it, at least in the same shell session, every time I do a write I
get a random number of tasks failing on the first run with the NPE.

Using dynamic allocation of executors in YARN mode. No speculative
execution is enabled.

On Tue, Jun 16, 2015 at 3:11 PM, Yin Huai  wrote:

> I saw it once but I was not clear how to reproduce it. The jira I created
> is https://issues.apache.org/jira/browse/SPARK-7837.
>
> More information will be very helpful. Were those errors from speculative
> tasks or regular tasks (the first attempt of the task)? Is this error
> deterministic (can you reproduce every time you run this command)?
>
> Thanks,
>
> Yin
>
> On Mon, Jun 15, 2015 at 8:59 PM, Night Wolf 
> wrote:
>
>> Looking at the logs of the executor, looks like it fails to find the
>> file; e.g. for task 10323.0
>>
>>
>> 15/06/16 13:43:13 ERROR output.FileOutputCommitter: Hit IOException
>> trying to rename
>> maprfs:///user/hive/warehouse/is_20150617_test2/_temporary/_attempt_201506161340__m_010181_0/part-r-353626.gz.parquet
>> to maprfs:/user/hive/warehouse/is_20150617_test2/part-r-353626.gz.parquet
>> java.io.IOException: Invalid source or target
>> at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
>> at
>> org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
>> at
>> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
>> at
>> org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
>> at
>> org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
>> at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
>> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
>> at
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>> at
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/06/16 13:43:13 ERROR mapred.SparkHadoopMapRedUtil: Error committing
>> the output of task: attempt_201506161340__m_010181_0
>> java.io.IOException: Invalid source or target
>> at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
>> at
>> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
>> at
>> org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
>> at
>> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
>> at
>> org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
>> at
>> org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
>> at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
>> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
>> at
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>> at
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 15/06/16 13:43:16 ERROR output.FileOutputCommitter: Hit IOException
>> trying to rename
>> maprfs:///user/hive/warehouse/is_20150617_test2/_temporary/_attempt_201506161341__m_010323_0/part-r-353768.gz.parquet
>> to maprfs:/user/hive/warehouse/is_20150617_test2/part-r-353768.gz.parquet
>> java.io.IOException: Invalid source or target
>> at com.mapr.fs.MapRFileSystem.rename(MapRFi

Re: How does one decide no of executors/cores/memory allocation?

2015-06-15 Thread gaurav sharma
When you submit a job, spark breaks down it into stages, as per DAG. the
stages run transformations or actions on the rdd's. Each rdd constitutes of
N partitions. The tasks creates by spark to execute the stage are equal to
 the number of partitions. Every task is executed on the  cored utilized by
the executors in your cluster.

--conf spark.cores.max=24 defines max cores you want to utilize. Spark
itself would distribute the number of cores among the workers.

More the number of partitions and more the cores available -> more the
level of parallelism -> better the performance

On Tue, Jun 16, 2015 at 9:27 AM, shreesh  wrote:

> How do I decide in how many partitions I break up my data into, how many
> executors should I have? I guess memory and cores will be allocated based
> on
> the number of executors I have.
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-of-executors-cores-memory-allocation-tp23326.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark DataFrame 1.4 write to parquet/saveAsTable tasks fail

2015-06-15 Thread Yin Huai
I saw it once but I was not clear how to reproduce it. The jira I created
is https://issues.apache.org/jira/browse/SPARK-7837.

More information will be very helpful. Were those errors from speculative
tasks or regular tasks (the first attempt of the task)? Is this error
deterministic (can you reproduce every time you run this command)?

Thanks,

Yin

On Mon, Jun 15, 2015 at 8:59 PM, Night Wolf  wrote:

> Looking at the logs of the executor, looks like it fails to find the file;
> e.g. for task 10323.0
>
>
> 15/06/16 13:43:13 ERROR output.FileOutputCommitter: Hit IOException trying
> to rename
> maprfs:///user/hive/warehouse/is_20150617_test2/_temporary/_attempt_201506161340__m_010181_0/part-r-353626.gz.parquet
> to maprfs:/user/hive/warehouse/is_20150617_test2/part-r-353626.gz.parquet
> java.io.IOException: Invalid source or target
> at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
> at
> org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
> at
> org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
> at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/06/16 13:43:13 ERROR mapred.SparkHadoopMapRedUtil: Error committing the
> output of task: attempt_201506161340__m_010181_0
> java.io.IOException: Invalid source or target
> at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
> at
> org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
> at
> org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
> at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
> at
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/06/16 13:43:16 ERROR output.FileOutputCommitter: Hit IOException trying
> to rename
> maprfs:///user/hive/warehouse/is_20150617_test2/_temporary/_attempt_201506161341__m_010323_0/part-r-353768.gz.parquet
> to maprfs:/user/hive/warehouse/is_20150617_test2/part-r-353768.gz.parquet
> java.io.IOException: Invalid source or target
> at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
> at
> org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
> at
> org.apac

ALS predictALL not completing

2015-06-15 Thread afarahat
Hello; 
I have a data set of about 80 Million users and 12,000 items (very sparse ). 
I can get the training part working no problem. (model has 20 factors), 
However, when i try using Predict all for 80 Million x 10 items , the jib
does not complete. 
When i use a smaller data set say 500k or a million it completes. 
Any ideas suggestions ?
Thanks
Ayman



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ALS-predictALL-not-completing-tp23327.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark DataFrame 1.4 write to parquet/saveAsTable tasks fail

2015-06-15 Thread Night Wolf
Looking at the logs of the executor, looks like it fails to find the file;
e.g. for task 10323.0


15/06/16 13:43:13 ERROR output.FileOutputCommitter: Hit IOException trying
to rename
maprfs:///user/hive/warehouse/is_20150617_test2/_temporary/_attempt_201506161340__m_010181_0/part-r-353626.gz.parquet
to maprfs:/user/hive/warehouse/is_20150617_test2/part-r-353626.gz.parquet
java.io.IOException: Invalid source or target
at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
at
org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
at
org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/06/16 13:43:13 ERROR mapred.SparkHadoopMapRedUtil: Error committing the
output of task: attempt_201506161340__m_010181_0
java.io.IOException: Invalid source or target
at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
at
org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
at
org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/06/16 13:43:16 ERROR output.FileOutputCommitter: Hit IOException trying
to rename
maprfs:///user/hive/warehouse/is_20150617_test2/_temporary/_attempt_201506161341__m_010323_0/part-r-353768.gz.parquet
to maprfs:/user/hive/warehouse/is_20150617_test2/part-r-353768.gz.parquet
java.io.IOException: Invalid source or target
at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:952)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:201)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:225)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:167)
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:100)
at
org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:137)
at
org.apache.spark.sql.sources.BaseWriterContainer.commitTask(commands.scala:357)
at
org.apache.spark.sql.sources.DefaultWriterContainer.commitTask(commands.scala:394)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:157)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$inser

How does one decide no of executors/cores/memory allocation?

2015-06-15 Thread shreesh
How do I decide in how many partitions I break up my data into, how many
executors should I have? I guess memory and cores will be allocated based on
the number of executors I have.
Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-one-decide-no-of-executors-cores-memory-allocation-tp23326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark DataFrame 1.4 write to parquet/saveAsTable tasks fail

2015-06-15 Thread Night Wolf
Hi guys,

Using Spark 1.4, trying to save a dataframe as a table, a really simple
test, but I'm getting a bunch of NPEs;

The code Im running is very simple;

 
qc.read.parquet("/user/sparkuser/data/staged/item_sales_basket_id.parquet").write.format("parquet").saveAsTable("is_20150617_test2")

Logs of tasks lost;

[Stage 0:=>(8771 + 450) /
13000]15/06/16 03:42:30 WARN TaskSetManager: Lost task 10681.0 in stage 0.0
(TID 8757, qtausc-pphd0146): java.lang.NullPointerException
at
parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
at
parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:116)
at
org.apache.spark.sql.sources.DefaultWriterContainer.abortTask(commands.scala:404)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:160)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

[Stage 0:==>   (9006 + 490) /
13000]15/06/16 03:43:22 WARN TaskSetManager: Lost task 10323.0 in stage 0.0
(TID 8896, qtausc-pphd0167): java.lang.NullPointerException
at
parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
at
parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:116)
at
org.apache.spark.sql.sources.DefaultWriterContainer.abortTask(commands.scala:404)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:160)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Creating RDD from Iterable from groupByKey results

2015-06-15 Thread Nirav Patel
I am trying to create new RDD based on given PairRDD. I have a PairRDD with
few keys but each keys have large (about 100k) values. I want to somehow
repartition, make each `Iterable` into RDD[v] so that I can further
apply map, reduce, sortBy etc effectively on those values. I am sensing
flatMapValues is my friend but want to check with other sparkens. This is
for real-time spark app. I have already tried collect() and computing all
measures in-memory of app server but trying to improve upon it.
This is what I try (psuedo)

class ComputeMetrices{
transient JavaSparkContext sparkContext;

public Map computeMetrices(JavaPairRdd javaPairRdd) {

  javaPairRdd.groupByKey(10).mapValues(itr => {
  sparContext.parallelize(list(itr)) //null pointer ; probably at
sparkContext
  })
}
}

I want to create RDD out of that Iterable from groupByKey result so that I
can user further spark transformations.

Thanks
Nir

-- 


[image: What's New with Xactly] 

[image: Facebook]   [image: LinkedIn] 
  [image: Twitter] 
  [image: YouTube] 



Re: Spark job throwing “java.lang.OutOfMemoryError: GC overhead limit exceeded”

2015-06-15 Thread Deng Ching-Mallete
Hi Raj,

Since the number of executor cores is equivalent to the number of tasks
that can be executed in parallel in the executor, in effect, the 6G
executor memory configured for an executor is being shared by 6 tasks plus
factoring in the memory allocation for caching & task execution. I would
suggest increasing the executor-memory and also adjusting it if you're
going to increase the number of executor cores.

You might also want to adjust the memory allocation for caching and task
execution, via the spark.storage.memoryFraction config. By default, it's
configured to 0.6 (60% of the memory is allocated for the cache). Lowering
it to a smaller fraction, say 0.4 or 0.3, would give you more available
memory for task executions.

Hope this helps!

Thanks,
Deng

On Tue, Jun 16, 2015 at 3:09 AM, diplomatic Guru 
wrote:

> Hello All,
>
>
> I have a Spark job that throws "java.lang.OutOfMemoryError: GC overhead
> limit exceeded".
>
> The job is trying to process a filesize 4.5G.
>
> I've tried following spark configuration:
>
> --num-executors 6  --executor-memory 6G --executor-cores 6 --driver-memory 3G
>
> I tried increasing more cores and executors which sometime works, but
> takes over 20 minutes to process the file.
>
> Could I do something to improve the performance? or stop the Java Heap
> issue?
>
>
> Thank you.
>
>
> Best regards,
>
>
> Raj.
>


Re: flatmapping with other data

2015-06-15 Thread dizzy5112
Sorry cut and paste error, the resulting data set i want is this:
({(101,S)=3},piece_of_data_1))
({(101,S)=3},piece_of_data_2))
({(101,S)=1},piece_of_data_3))
({(109,S)=2},piece_of_data_3))



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flatmapping-with-other-data-tp23324p23325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Limit Spark Shuffle Disk Usage

2015-06-15 Thread rahulkumar-aws
Check this link 
https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html

  

Hope this will solve your problem.



-
Software Developer
Sigmoid (SigmoidAnalytics), India

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Limit-Spark-Shuffle-Disk-Usage-tp23279p23323.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark application in production without HDFS

2015-06-15 Thread rahulkumar-aws
Hi If your data is not so huge you can use both cloudera and HDP's free
stack. Cloudera Express is 100% opensource free. 



-
Software Developer
SigmoidAnalytics, Bangalore

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-application-in-production-without-HDFS-tp23260p23322.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark application in production without HDFS

2015-06-15 Thread nsalian
Hi,

Spark on YARN should help in the memory management for Spark jobs.
Here is a good starting point:
https://spark.apache.org/docs/latest/running-on-yarn.html
YARN integrates well with HDFS and should be a good solution for a large
cluster.
What specific features are you looking for that HDFS does not satisfy?

Thank you.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-application-in-production-without-HDFS-tp23260p23320.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does spark performance really scale out with multiple machines?

2015-06-15 Thread William Briggs
I just wanted to clarify - when I said you hit your "maximum level of
parallelism", I meant that the default number of partitions might not be
large enough to take advantage of more hardware, not that there was no way
to increase your parallelism - the documentation I linked gives a few
suggestions on how to increase the number of partitions.

-Will

On Mon, Jun 15, 2015 at 5:00 PM, William Briggs  wrote:

> There are a lot of variables to consider. I'm not an expert on Spark, and
> my ML knowledge is rudimentary at best, but here are some questions whose
> answers might help us to help you:
>
>- What type of Spark cluster are you running (e.g., Stand-alone,
>Mesos, YARN)?
>- What does the HTTP UI tell you in terms of number of stages / tasks,
>number of exectors, and task execution time / memory used / amount of data
>shuffled over the network?
>
> As I said, I'm not all that familiar with the ML side of Spark, but in
> general, if I were adding more resources, and not seeing an improvement,
> here are a few things I would consider:
>
>1. Is your data set partitioned to allow the parallelism you are
>seeking? Spark's parallelism comes from processing RDD partitions in
>parallel, not processing individual RDD items in parallel; if you don't
>have enough partitions to take advantage of the extra hardware, you will
>see no benefit from adding capacity to your cluster.
>2. Do you have enough Spark executors to process your partitions in
>parallel? This depends on  your configuration and on your cluster type
>(doubtful this is an issue here, since you are adding more executors and
>seeing very little benefit).
>3. Are your partitions small enough (and/or your executor memory
>configuration large enough) so that each partition fits into the memory of
>an executor? If not, you will be constantly spilling to disk, which will
>have a severe impact on performance.
>4. Are you shuffling over the network? If so, how frequently and how
>much? Are you using efficient serialization (e.g., Kryo) and registering
>your serialized classes in order to minimize shuffle overhead?
>
> There are plenty more variables, and some very good performance tuning
> documentation  is
> available. Without any more information to go on, my best guess would be
> that you hit your maximum level of parallelism with the addition of the
> second node (and even that was not fully utilized), and thus you see no
> difference when adding a third node.
>
> Regards,
> Will
>
>
> On Mon, Jun 15, 2015 at 1:29 PM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
>
>>  I try to measure how spark standalone cluster performance scale out
>> with multiple machines. I did a test of training the SVM model which is
>> heavy in memory computation. I measure the run time for spark standalone
>> cluster of 1 – 3 nodes, the result is following
>>
>>
>>
>> 1 node: 35 minutes
>>
>> 2 nodes: 30.1 minutes
>>
>> 3 nodes: 30.8 minutes
>>
>>
>>
>> So the speed does not seems to increase much with more machines. I know
>> there are overhead for coordinating tasks among different machines. Seem to
>> me the overhead is over 30% of the total run time.
>>
>>
>>
>> Is this typical? Does anybody see significant performance increase with
>> more machines? Is there anything I can tune my spark cluster to make it
>> scale out with more machines?
>>
>>
>>
>> Thanks
>>
>> Ningjun
>>
>>
>>
>
>


Re: Does spark performance really scale out with multiple machines?

2015-06-15 Thread William Briggs
There are a lot of variables to consider. I'm not an expert on Spark, and
my ML knowledge is rudimentary at best, but here are some questions whose
answers might help us to help you:

   - What type of Spark cluster are you running (e.g., Stand-alone, Mesos,
   YARN)?
   - What does the HTTP UI tell you in terms of number of stages / tasks,
   number of exectors, and task execution time / memory used / amount of data
   shuffled over the network?

As I said, I'm not all that familiar with the ML side of Spark, but in
general, if I were adding more resources, and not seeing an improvement,
here are a few things I would consider:

   1. Is your data set partitioned to allow the parallelism you are
   seeking? Spark's parallelism comes from processing RDD partitions in
   parallel, not processing individual RDD items in parallel; if you don't
   have enough partitions to take advantage of the extra hardware, you will
   see no benefit from adding capacity to your cluster.
   2. Do you have enough Spark executors to process your partitions in
   parallel? This depends on  your configuration and on your cluster type
   (doubtful this is an issue here, since you are adding more executors and
   seeing very little benefit).
   3. Are your partitions small enough (and/or your executor memory
   configuration large enough) so that each partition fits into the memory of
   an executor? If not, you will be constantly spilling to disk, which will
   have a severe impact on performance.
   4. Are you shuffling over the network? If so, how frequently and how
   much? Are you using efficient serialization (e.g., Kryo) and registering
   your serialized classes in order to minimize shuffle overhead?

There are plenty more variables, and some very good performance tuning
documentation  is
available. Without any more information to go on, my best guess would be
that you hit your maximum level of parallelism with the addition of the
second node (and even that was not fully utilized), and thus you see no
difference when adding a third node.

Regards,
Will


On Mon, Jun 15, 2015 at 1:29 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

>  I try to measure how spark standalone cluster performance scale out with
> multiple machines. I did a test of training the SVM model which is heavy in
> memory computation. I measure the run time for spark standalone cluster of
> 1 – 3 nodes, the result is following
>
>
>
> 1 node: 35 minutes
>
> 2 nodes: 30.1 minutes
>
> 3 nodes: 30.8 minutes
>
>
>
> So the speed does not seems to increase much with more machines. I know
> there are overhead for coordinating tasks among different machines. Seem to
> me the overhead is over 30% of the total run time.
>
>
>
> Is this typical? Does anybody see significant performance increase with
> more machines? Is there anything I can tune my spark cluster to make it
> scale out with more machines?
>
>
>
> Thanks
>
> Ningjun
>
>
>


missing part of the file while using newHadoopApi

2015-06-15 Thread igor.berman
Hi
Have anyone experienced problem with uploading to s3 with s3n protocol with
spark newHadoopApi, when job completes successfully(there is _SUCCESS
marker), but in reality one of the parts of the file is missing ? 
Thanks in advance

ps: we are trying s3a now(which needs upgrade to hadoop2.7)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/missing-part-of-the-file-while-using-newHadoopApi-tp23318.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problem: Custom Receiver for getting events from a Dynamic Queue

2015-06-15 Thread anshu shukla
I have written a custom receiver for converting the tuples in the Dynamic
Queue/EventGen  to the Dstream.But i dont know why It is only processing
data for some time (3-4 sec.) only and then shows Queue as Empty .ANy
suggestions please ..>>

--code //


public class JavaCustomReceiver extends Receiver implements
ISyntheticEventGen {


EventGen eventGen;
BlockingQueue> eventQueue;
String csvFileName;
String outSpoutCSVLogFileName;
double scalingFactor;

public JavaCustomReceiver(String csvFileName, String
outSpoutCSVLogFileName, double scalingFactor) {
super(StorageLevel.MEMORY_AND_DISK());

this.csvFileName = csvFileName;
this.outSpoutCSVLogFileName = outSpoutCSVLogFileName;
this.scalingFactor = scalingFactor;

this.eventGen = new EventGen(this,this.scalingFactor);
this.eventGen.launch(this.csvFileName,
this.outSpoutCSVLogFileName); //Launch threads


this.eventQueue = new LinkedBlockingQueue>();
System.out.println("for watching queue");
}

public void onStart() {
// Start the thread that receives data over a connection
new Thread()  {
@Override public void run() {
receive();
}
}.start();
}

public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}

/** Create a socket connection and receive data until receiver is stopped */
private void receive() {

try {
// connect to the server
//socket = new Socket(host, port);

//BufferedReader reader = new BufferedReader(new
InputStreamReader(socket.getInputStream()));

// Until stopped or connection broken continue reading
while (!isStopped() ) {
 List entry = this.eventQueue.take();

String str="";
for(String s:entry)
str+=s+",";
System.out.println("Received data '" + str + "'");
store(str);

}
// Restart in an attempt to connect again when server is
active again
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}

@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_AND_DISK();
}


@Override
public void receive(List event) {
// TODO Auto-generated method stub
//System.out.println("Called IN SPOUT### ");
try {
this.eventQueue.put(event);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}


-- 
Thanks & Regards,
Anshu Shukla


Spark job throwing “java.lang.OutOfMemoryError: GC overhead limit exceeded”

2015-06-15 Thread diplomatic Guru
Hello All,


I have a Spark job that throws "java.lang.OutOfMemoryError: GC overhead
limit exceeded".

The job is trying to process a filesize 4.5G.

I've tried following spark configuration:

--num-executors 6  --executor-memory 6G --executor-cores 6 --driver-memory 3G

I tried increasing more cores and executors which sometime works, but takes
over 20 minutes to process the file.

Could I do something to improve the performance? or stop the Java Heap
issue?


Thank you.


Best regards,


Raj.


akka configuration not found

2015-06-15 Thread Ritesh Kumar Singh
Hi,

Though my project has nothing to do with akka, I'm getting this error :

Exception in thread "main" com.typesafe.config.ConfigException$Missing: No
configuration setting found for key 'akka.version'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:206)
at akka.actor.ActorSystem$Settings.(ActorSystem.scala:168)
at akka.actor.ActorSystemImpl.(ActorSystem.scala:504)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1832)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1823)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
at org.apache.spark.SparkContext.(SparkContext.scala:270)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at Test.main(Test.java:9)

There is no reference to akka anywhere in the code / pom file.
Any fixes?

Thanks,
Ritesh


Does spark performance really scale out with multiple machines?

2015-06-15 Thread Wang, Ningjun (LNG-NPV)
I try to measure how spark standalone cluster performance scale out with 
multiple machines. I did a test of training the SVM model which is heavy in 
memory computation. I measure the run time for spark standalone cluster of 1 - 
3 nodes, the result is following

1 node: 35 minutes
2 nodes: 30.1 minutes
3 nodes: 30.8 minutes

So the speed does not seems to increase much with more machines. I know there 
are overhead for coordinating tasks among different machines. Seem to me the 
overhead is over 30% of the total run time.

Is this typical? Does anybody see significant performance increase with more 
machines? Is there anything I can tune my spark cluster to make it scale out 
with more machines?

Thanks
Ningjun



DataFrame insertIntoJDBC parallelism while writing data into a DB table

2015-06-15 Thread Mohammad Tariq
Hello list,

The method *insertIntoJDBC(url: String, table: String, overwrite: Boolean)*
provided by Spark DataFrame allows us to copy a DataFrame into a JDBC DB
table. Similar functionality is provided by the *createJDBCTable(url:
String, table: String, allowExisting: Boolean) *method. But if you look at
the docs it says that *createJDBCTable *runs a *bunch of Insert statements*
in order to copy the data. While the docs of *insertIntoJDBC *doesn't have
any such statement.

Could someone please shed some light on this? How exactly data gets
inserted using *insertIntoJDBC *method?

And if it is same as *createJDBCTable *method, then what exactly does *bunch
of Insert statements* mean? What's the criteria to decide the number
*inserts/bunch*? How are these bunches generated?

*An example* could be creating a DataFrame by reading all the files stored
in a given directory. If I just do *DataFrame.save()*, it'll create the
same number of output files as the input files. What'll happen in case of
*DataFrame.df.insertIntoJDBC()*?

I'm really sorry to be pest of questions, but I could net get much help by
Googling about this.

Thank you so much for your valuable time. really appreciate it.

[image: http://]
Tariq, Mohammad
about.me/mti
[image: http://]



Error using spark 1.3.0 with maven

2015-06-15 Thread Ritesh Kumar Singh
Hi,

I'm getting this error while running spark as a java project using maven :

15/06/15 17:11:38 INFO SparkContext: Running Spark version 1.3.0
15/06/15 17:11:38 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/06/15 17:11:38 INFO SecurityManager: Changing view acls to: root
15/06/15 17:11:38 INFO SecurityManager: Changing modify acls to: root
15/06/15 17:11:38 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root); users
with modify permissions: Set(root)
Exception in thread "main" com.typesafe.config.ConfigException$Missing: No
configuration setting found for key 'akka.version'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:206)
at akka.actor.ActorSystem$Settings.(ActorSystem.scala:168)
at akka.actor.ActorSystemImpl.(ActorSystem.scala:504)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1832)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1823)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
at org.apache.spark.SparkContext.(SparkContext.scala:270)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at Test.main(Test.java:9)


==

My Test.java file contains following :

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class Test {
  public static void main(String[] args) {
String logFile = "/code/data.txt";
SparkConf conf = new
SparkConf().setMaster("local[4]").setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD logData = sc.textFile(logFile);

long numAs = logData.filter(new Function() {
  public Boolean call(String s) { return s.contains("a"); }
}).count();

long numBs = logData.filter(new Function() {
  public Boolean call(String s) { return s.contains("b"); }
}).count();

System.out.println("Lines with a: " + numAs + ", lines with b: " +
numBs);
  }
}


==

My pom file contains the following :


http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance";
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
  4.0.0

  dexample
  sparktest
  Testing spark with maven
  jar
  1.0-SNAPSHOT

  

  org.apache.spark
  spark-core_2.10
  1.3.0

  
  

  
org.apache.maven.plugins
maven-jar-plugin
2.6

  sparktest
  

  true
  Test
  dependency-jars/

  

  
  
org.apache.maven.plugins
maven-compiler-plugin
3.3

  1.7
  1.7

  
  
org.apache.maven.plugins
maven-assembly-plugin

  

  attached

package

  sparktest
  
jar-with-dependencies
  
  

  Test

  

  

  

  



Re: How can I use Tachyon with SPARK?

2015-06-15 Thread Himanshu Mehra
Hi June,

As i understand your problem, you are running spark 1.3 and want to use
Tachyon with it. what you need to do is simply build the latest Spark and
Tachyon and set some configuration is Spark. In fact spark 1.3 has
"spark/core/pom.xm", you have to find the "core" folder in your spark home
and inside it there must be a "pom.xml" file, if its not there i suspect
your spark code is broken or incomplete, and you should download the source
code again. what you need to do is find the Tachyon dependency in that
pom.xml and set its version as per you want. plus there are some additional
configuration you need to do in spark to enable Tachyon with spark. here is
list of sparkConf properties regarding Tachyon :
https://docs.sigmoidanalytics.com/index.php/Configuration_Settings

And if you find any difficulty building or recompiling spark you should
refer to this doc : https://spark.apache.org/docs/1.3.0/building-spark.html

for any further assistance please write back.

Thank you
Himashu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-use-Tachyon-with-SPARK-tp23312p23317.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark sql and cassandra. spark generate 769 tasks to read 3 lines from cassandra table

2015-06-15 Thread Serega Sheypak
Hi, I'm running spark sql against Cassandra table. I have 3 C* nodes, Each
of them has spark worker.
The problem is that spark runs 869 task to read 3 lines: select bar from
foo.
I've tried these properties:

#try to avoid 769 tasks per dummy select foo from bar qeury
spark.cassandra.input.split.size_in_mb=32mb
spark.cassandra.input.fetch.size_in_rows=1000
spark.cassandra.input.split.size=1

but it doesn't help.

Here are  mean metrics for the job :
input1= 8388608.0 TB
input2 = -320 B
input3 = -400 B

I'm confused with input, there are only 3 rows in C* table.
Definitely, I don't have 8388608.0 TB of data :)


Not getting event logs >= spark 1.3.1

2015-06-15 Thread Tsai Li Ming
Hi,

I have this in my spark-defaults.conf (same for hdfs):
spark.eventLog.enabled  true
spark.eventLog.dir  file:/tmp/spark-events
spark.history.fs.logDirectory   file:/tmp/spark-events

While the app is running, there is a “.inprogress” directory. However when the 
job completes, the directory is always empty.

I’m submitting the job like this, using either the Pi or world count examples:
$ bin/spark-submit 
/opt/spark-1.4.0-bin-hadoop2.6/examples/src/main/python/wordcount.py 

This used to be working in 1.2.1 and didn’t test 1.3.0.


Regards,
Liming






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark DataFrame Reduce Job Took 40s for 6000 Rows

2015-06-15 Thread Todd Nist
Hi Proust,

Is it possible to see the query  you are running and can you run EXPLAIN
EXTENDED to show the physical plan
for the query.  To generate the plan you can do something like this from
$SPARK_HOME/bin/beeline:

0: jdbc:hive2://localhost:10001> explain extended select * from
YourTableHere;

-Todd

On Mon, Jun 15, 2015 at 10:57 AM, Proust GZ Feng  wrote:

> Thanks a lot Akhil, after try some suggestions in the tuning guide, there
> seems no improvement at all.
>
> And below is the job detail when running locally(8cores) which took 3min
> to complete the job, we can see it is the map operation took most of time,
> looks like the mapPartitions took too long
>
> Is there any additional idea? Thanks a lot.
>
> Proust
>
>
>
>
> From:Akhil Das 
> To:Proust GZ Feng/China/IBM@IBMCN
> Cc:"user@spark.apache.org" 
> Date:06/15/2015 03:02 PM
> Subject:Re: Spark DataFrame Reduce Job Took 40s for 6000 Rows
> --
>
>
>
> Have a look here *https://spark.apache.org/docs/latest/tuning.html*
> 
>
> Thanks
> Best Regards
>
> On Mon, Jun 15, 2015 at 11:27 AM, Proust GZ Feng <*pf...@cn.ibm.com*
> > wrote:
> Hi, Spark Experts
>
> I have played with Spark several weeks, after some time testing, a reduce
> operation of DataFrame cost 40s on a cluster with 5 datanode executors.
> And the back-end rows is about 6,000, is this a normal case? Such
> performance looks too bad because in Java a loop for 6,000 rows cause just
> several seconds
>
> I'm wondering any document I should read to make the job much more fast?
>
>
>
>
> Thanks in advance
> Proust
>
>


number of partitions in join: Spark documentation misleading!

2015-06-15 Thread mrm
Hi all,

I was looking for an explanation on the number of partitions for a joined
rdd.

The documentation of Spark 1.3.1. says that:
"For distributed shuffle operations like reduceByKey and join, the largest
number of partitions in a parent RDD."
https://spark.apache.org/docs/latest/configuration.html

And the Partitioner.scala comments (line 51) state that:
"Unless spark.default.parallelism is set, the number of partitions will be
the same as the number of partitions in the largest upstream RDD, as this
should be least likely to cause out-of-memory errors."

But this is misleading for the Python API where if you do rddA.join(rddB),
the output number of partitions is the number of partitions of A plus the
number of partitions of B!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/number-of-partitions-in-join-Spark-documentation-misleading-tp23316.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark DataFrame Reduce Job Took 40s for 6000 Rows

2015-06-15 Thread Proust GZ Feng
Thanks a lot Akhil, after try some suggestions in the tuning guide, there 
seems no improvement at all.

And below is the job detail when running locally(8cores) which took 3min 
to complete the job, we can see it is the map operation took most of time, 
looks like the mapPartitions took too long

Is there any additional idea? Thanks a lot.

Proust




From:   Akhil Das 
To: Proust GZ Feng/China/IBM@IBMCN
Cc: "user@spark.apache.org" 
Date:   06/15/2015 03:02 PM
Subject:Re: Spark DataFrame Reduce Job Took 40s for 6000 Rows



Have a look here https://spark.apache.org/docs/latest/tuning.html

Thanks
Best Regards

On Mon, Jun 15, 2015 at 11:27 AM, Proust GZ Feng  wrote:
Hi, Spark Experts 

I have played with Spark several weeks, after some time testing, a reduce 
operation of DataFrame cost 40s on a cluster with 5 datanode executors. 
And the back-end rows is about 6,000, is this a normal case? Such 
performance looks too bad because in Java a loop for 6,000 rows cause just 
several seconds 

I'm wondering any document I should read to make the job much more fast? 




Thanks in advance 
Proust 



Re: Spark standalone mode and kerberized cluster

2015-06-15 Thread Borja Garrido Bear
I tried running the job in a standalone cluster and I'm getting this:

java.io.IOException: Failed on local exception: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
"worker-node/0.0.0.0"; destination host is: "hdfs":9000;


Both nodes can access the HDFS running spark locally, and have valid
kerberos credentials, I know for the moment keytab is not supported
for standalone mode, but as long as the tokens I had when initiating
the workers and masters are valid this should work, shouldn't it?



On Thu, Jun 11, 2015 at 10:22 AM, Steve Loughran 
wrote:

>  That's spark on YARN in Kerberos
>
>  In Spark 1.3 you can submit work to a Kerberized Hadoop cluster; once
> the tokens you passed up with your app submission expire (~72 hours) your
> job can't access HDFS any more.
>
>  That's been addressed in Spark 1.4, where you can now specify a kerberos
> keytab for the application master; the AM will then give the workers
> updated tokens when needed.
>
>  The kerberos authentication is all related to the HDFS interaction, YARN
> itself, and the way Kerberized YARN runs your work under your userid, not
> "mapred" or "yarn"
> It will also handle SPNEGO authentication between your web browser and the
> Spark UI (which is redirected via the YARN RM Proxy to achieve this)
>
>  it does not do anything about Akka-based IPC between your client code
> and the spark application
>
>  -steve
>
>  On 11 Jun 2015, at 06:47, Akhil Das  wrote:
>
>  This might help
> http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.4/Apache_Spark_Quickstart_v224/content/ch_installing-kerb-spark-quickstart.html
>
>  Thanks
> Best Regards
>
> On Wed, Jun 10, 2015 at 6:49 PM, kazeborja  wrote:
>
>> Hello all.
>>
>> I've been reading some old mails and notice that the use of kerberos in a
>> standalone cluster was not supported. Is this stillt he case?
>>
>> Thanks.
>> Borja.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-mode-and-kerberized-cluster-tp23255.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


*Metrics API is odd in MLLib

2015-06-15 Thread Sam
Google+


Calendar

Web

more
Inbox
Apache Spark Email
GmailNot Work
S
sam.sav...@barclays.com
to me
0 minutes ago
Details
According to
https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

The constructor takes `RDD[(Double, Double)]` meaning lables are Doubles,
this seems odd, shouldn't it be Boolean?  Similarly for MutlilabelMetrics
(I.e. Should be RDD[(Array[Double], Array[Boolean])]), and for
MulticlassMetrics the type of both should be generic?

Additionally it would be good if either the ROC output type was changed or
another method was added that returned confusion matricies, so that the
hard integer values can be obtained before the divisions. E.g.

```
case class Confusion(tp: Int, fp: Int, fn: Int, tn: Int)
{
  // bunch of methods for each of the things in the table here
https://en.wikipedia.org/wiki/Receiver_operating_characteristic
}
...
def confusions(): RDD[Confusion]
```


Re: Running spark1.4 inside intellij idea HttpServletResponse - ClassNotFoundException

2015-06-15 Thread Tarek Auel
Hey,

I had some similar issues in the past when I used Java 8. Are you using
Java 7 or 8. (it's just an idea, because I had a similar issue)
On Mon 15 Jun 2015 at 6:52 am Wwh 吴  wrote:

> name := "SparkLeaning"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
> //scalaVersion := "2.11.2"
>
> libraryDependencies ++= Seq(
>   //"org.apache.hive"% "hive-jdbc" % "0.13.0"
>   //"io.spray" % "spray-can" % "1.3.1",
>   //"io.spray" % "spray-routing" % "1.3.1",
>   "io.spray" % "spray-testkit" % "1.3.1" % "test",
>   "io.spray" %% "spray-json" % "1.2.6",
>   "com.typesafe.akka" %% "akka-actor" % "2.3.2",
>   "com.typesafe.akka" %% "akka-testkit" % "2.3.2" % "test",
>   "org.scalatest" %% "scalatest" % "2.2.0",
>   "org.apache.spark" %% "spark-core" % "1.4.0",
>   "org.apache.spark" %% "spark-sql" % "1.4.0",
>   "org.apache.spark" %% "spark-hive" % "1.4.0",
>   "org.apache.spark" %% "spark-mllib" % "1.4.0",
>   //"org.apache.hadoop" %% "hadoop-client" % "2.4.0"
>   "javax.servlet" % "javax.servlet-api" % "3.0.1"//,
>   //"org.eclipse.jetty"%%"jetty-servlet"%"8.1.14.v20131031",
>   //"org.eclipse.jetty.orbit"%"javax.servlet"%"3.0.0.v201112011016"
>   //"org.mortbay.jetty"%%"servlet-api"%"3.0.20100224"
>
> )
>
> object SparkPI {
>   def main(args:Array[String]): Unit = {
> val conf = new SparkConf().setAppName("Spark Pi")
> conf.setMaster("local")
>
> val spark = new SparkContext(conf)
> val slices = if (args.length > 0)args(0).toInt else 2
> val n = 10 * slices
> val count = spark.parallelize(1 to n, slices).map{ i =>
>   val x = random * 2 -1
>   val y = random * 2 -1
>   if (x*x + y*y < 1) 1 else 0
> }.reduce(_ + _)
> println("Pi is roughly" + 4.0 * count / n)
> spark.stop()
>   }
> }
>
> when Running this program,something is error! help me?
>
> 15/06/15 21:40:08 INFO HttpServer: Starting HTTP Server
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> javax/servlet/http/HttpServletResponse
>   at 
> org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:75)
>   at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:62)
>   at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:62)
>   at 
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1991)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1982)
>   at org.apache.spark.HttpServer.start(HttpServer.scala:62)
>   at org.apache.spark.HttpFileServer.initialize(HttpFileServer.scala:46)
>   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:350)
>   at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:188)
>   at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
>   at org.apache.spark.SparkContext.(SparkContext.scala:424)
>   at org.learn.SparkPI$.main(SparkPI.scala:24)
>   at org.learn.SparkPI.main(SparkPI.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
> Caused by: java.lang.ClassNotFoundException: 
> javax.servlet.http.HttpServletResponse
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>   ... 19 more
> 15/06/15 21:40:08 INFO DiskBlockManager: Shutdown hook called
>
>
>


sql.catalyst.ScalaReflection scala.reflect.internal.MissingRequirementError

2015-06-15 Thread patcharee

Hi,

I use spark 0.14. I tried to create dataframe from RDD below, but got 
scala.reflect.internal.MissingRequirementError


val partitionedTestDF2 = pairVarRDD.toDF("column1","column2","column3")
//pairVarRDD is RDD[Record4Dim_2], and Record4Dim_2 is a Case Class

How can I fix this?

Exception in thread "main" 
scala.reflect.internal.MissingRequirementError: class etl.Record4Dim_2 
in JavaMirror with sun.misc.Launcher$AppClassLoader@30177039 of type 
class sun.misc.Launcher$AppClassLoader with classpath 
[file:/local/spark140/conf/,file:/local/spark140/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.6.0.jar,file:/local/spark140/lib/datanucleus-core-3.2.10.jar,file:/local/spark140/lib/datanucleus-rdbms-3.2.9.jar,file:/local/spark140/lib/datanucleus-api-jdo-3.2.6.jar,file:/etc/hadoop/conf/] 
and parent being sun.misc.Launcher$ExtClassLoader@52c8c6d9 of type class 
sun.misc.Launcher$ExtClassLoader with classpath 
[file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/sunec.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/sunjce_provider.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/sunpkcs11.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/zipfs.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/localedata.jar,file:/usr/jdk64/jdk1.7.0_67/jre/lib/ext/dnsns.jar] 
and parent being primordial classloader with boot classpath 
[/usr/jdk64/jdk1.7.0_67/jre/lib/resources.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/rt.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/sunrsasign.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/jsse.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/jce.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/charsets.jar:/usr/jdk64/jdk1.7.0_67/jre/lib/jfr.jar:/usr/jdk64/jdk1.7.0_67/jre/classes] 
not found.
at 
scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
at 
scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
at 
scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
at 
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
at 
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
at 
no.uni.computing.etl.LoadWrfV14$$typecreator1$1.apply(LoadWrfV14.scala:91)
at 
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)

at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:71)
at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:59)
at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:28)
at 
org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:410)
at 
org.apache.spark.sql.SQLContext$implicits$.rddToDataFrameHolder(SQLContext.scala:335)


BR,
Patcharee


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Join highly skewed datasets

2015-06-15 Thread Night Wolf
How far did you get?

On Tue, Jun 2, 2015 at 4:02 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> We use Scoobi + MR to perform joins and we particularly use blockJoin()
> API of scoobi
>
>
> /** Perform an equijoin with another distributed list where this list is
> considerably smaller
> * than the right (but too large to fit in memory), and where the keys of
> right may be
> * particularly skewed. */
>
>  def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] =
> Relational.blockJoin(left, right)
>
>
> I am trying to do a POC and what Spark join API(s) is recommended to
> achieve something similar ?
>
> Please suggest.
>
> --
> Deepak
>
>


Running spark1.4 inside intellij idea HttpServletResponse - ClassNotFoundException

2015-06-15 Thread Wwh 吴
name := "SparkLeaning"

version := "1.0"

scalaVersion := "2.10.4"
//scalaVersion := "2.11.2"

libraryDependencies ++= Seq(
  //"org.apache.hive"% "hive-jdbc" % "0.13.0"
  //"io.spray" % "spray-can" % "1.3.1",
  //"io.spray" % "spray-routing" % "1.3.1",
  "io.spray" % "spray-testkit" % "1.3.1" % "test",
  "io.spray" %% "spray-json" % "1.2.6",
  "com.typesafe.akka" %% "akka-actor" % "2.3.2",
  "com.typesafe.akka" %% "akka-testkit" % "2.3.2" % "test",
  "org.scalatest" %% "scalatest" % "2.2.0",
  "org.apache.spark" %% "spark-core" % "1.4.0",
  "org.apache.spark" %% "spark-sql" % "1.4.0",
  "org.apache.spark" %% "spark-hive" % "1.4.0",
  "org.apache.spark" %% "spark-mllib" % "1.4.0",
  //"org.apache.hadoop" %% "hadoop-client" % "2.4.0"
  "javax.servlet" % "javax.servlet-api" % "3.0.1"//,
  //"org.eclipse.jetty"%%"jetty-servlet"%"8.1.14.v20131031",
  //"org.eclipse.jetty.orbit"%"javax.servlet"%"3.0.0.v201112011016"
  //"org.mortbay.jetty"%%"servlet-api"%"3.0.20100224"

)object SparkPI {
  def main(args:Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local")

val spark = new SparkContext(conf)
val slices = if (args.length > 0)args(0).toInt else 2
val n = 10 * slices
val count = spark.parallelize(1 to n, slices).map{ i =>
  val x = random * 2 -1
  val y = random * 2 -1
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly" + 4.0 * count / n)
spark.stop()
  }
}when Running this program,something is error! help me?15/06/15 21:40:08 INFO 
HttpServer: Starting HTTP Server
Exception in thread "main" java.lang.NoClassDefFoundError: 
javax/servlet/http/HttpServletResponse
at 
org.apache.spark.HttpServer.org$apache$spark$HttpServer$$doStart(HttpServer.scala:75)
at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:62)
at org.apache.spark.HttpServer$$anonfun$1.apply(HttpServer.scala:62)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1991)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1982)
at org.apache.spark.HttpServer.start(HttpServer.scala:62)
at org.apache.spark.HttpFileServer.initialize(HttpFileServer.scala:46)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:350)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:188)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
at org.apache.spark.SparkContext.(SparkContext.scala:424)
at org.learn.SparkPI$.main(SparkPI.scala:24)
at org.learn.SparkPI.main(SparkPI.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: java.lang.ClassNotFoundException: 
javax.servlet.http.HttpServletResponse
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 19 more
15/06/15 21:40:08 INFO DiskBlockManager: Shutdown hook called
  

settings from props file seem to be ignored in mesos

2015-06-15 Thread Gary Ogden
I'm loading these settings from a properties file:
spark.executor.memory=256M
spark.cores.max=1
spark.shuffle.consolidateFiles=true
spark.task.cpus=1
spark.deploy.defaultCores=1
spark.driver.cores=1
spark.scheduler.mode=FAIR

Once the job is submitted to mesos, I can go to the spark UI for that job
(hostname:4040) and on the environment tab. I see that those settings are
there.

If I then comment out all those settings and allow spark to use the
defaults, it still appears to use the same settings in mesos.

Under both runs, it still shows 1 task, 3 cpu, 1GB memory.

Nothing seems to change no matter what is put in that props file, even if
they show up in the spark environment tab.


tasks won't run on mesos when using fine grained

2015-06-15 Thread Gary Ogden
My Mesos cluster has 1.5 CPU and 17GB free.  If I set:

conf.set("spark.mesos.coarse", "true");
conf.set("spark.cores.max", "1");

in the SparkConf object, the job will run in the mesos cluster fine.

But if I comment out those settings above so that it defaults to fine
grained, the task never finishes. It just shows as 0 for everything in the
mesos frameworks (# of tasks, cpu, memory are all 0).  There's nothing in
the log files anywhere as to what's going on.

Thanks


Using queueStream

2015-06-15 Thread anshu shukla
JavaDStream inputStream = ssc.queueStream(rddQueue);

Can this   rddQueue  be of dynamic type  in nature .If yes  then how to
make it run untill rddQueue is not finished .

Any other way to get  rddQueue from a dynamically updatable Normal Queue .

-- 
Thanks & Regards,
SERC-IISC
Anshu Shukla


Re: BigDecimal problem in parquet file

2015-06-15 Thread Bipin Nag
HI Davies,

I have tried recent 1.4 and 1.5-snapshot to 1) open the parquet and save it
again or 2 apply schema to rdd and save dataframe as parquet but now I get
this error (right in the beginning):

java.lang.OutOfMemoryError: Java heap space
at
parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
at
parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
at
parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)
at
parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
at
parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
at
parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
at
parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
at
parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
at
parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)
at
parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
at
org.apache.spark.sql.parquet.ParquetOutputWriter.(newParquet.scala:111)
at
org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:244)
at
org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:386)
at
org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:298)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:142)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at parquet.io.api.Binary.fromByteArray(Binary.java:159)
at
parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.(PlainValuesDictionary.java:94)
at
parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.(PlainValuesDictionary.java:67)
at parquet.column.Encoding$4.initDictionary(Encoding.java:131)
at
parquet.column.impl.ColumnReaderImpl.(ColumnReaderImpl.java:325)
at
parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:63)
at
parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:58)
at
parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:267)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
at
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at
org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:163)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org
$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:152)

I am not sure if this is related to your patch or some other bug. My error
doesn't show up in newer versions, so this is the problem to fix now.

Thanks

On 13 June 2015 at 06:31, Davies Liu  wrote:

> Maybe it's related to a bug, which is fixed by
> https://github.com/apache/spark/pull/6558 recently.
>
> On Fri, Jun 12, 2015 at 5:38 AM, Bipin Nag  wrote:
> > Hi Cheng,
> >
> > Yes, some rows contain unit instead of decimal values. I believe some
> rows
> > from original source I had don't have any value i.e. it is null. And that
> > shows up as unit. How 

Re: UNRESOLVED DEPENDENCIES while building Spark 1.3.0

2015-06-15 Thread François Garillot
You may want to have a look there if you (or the original author) are using
JDK 8.0:

https://issues.apache.org/jira/browse/SPARK-4193
https://issues.apache.org/jira/browse/SPARK-4543

Cheers,

On Sat, Apr 4, 2015 at 10:39 PM mas  wrote:

> Hi All,
> I am trying to build spark 1.3.O on standalone Ubuntu 14.04. I am using the
> sbt command i.e. "sbt/sbt assembly" to build it. This command works pretty
> good with spark version 1.1 however, it gives following error with spark
> 1.3.0. Any help or suggestions to resolve this would highly be appreciated.
>
> [info] Done updating.
> [info] Updating {file:/home/roott/aamirTest/spark/}network-shuffle...
> [info] Resolving org.fusesource.jansi#jansi;1.4 ...
> [warn]  ::
> [warn]  ::  UNRESOLVED DEPENDENCIES ::
> [warn]  ::
> [warn]  :: org.apache.spark#spark-network-common_2.10;1.3.0: configuration
> not p
> ublic in org.apache.spark#spark-network-common_2.10;1.3.0: 'test'. It was
> requir
> ed from org.apache.spark#spark-network-shuffle_2.10;1.3.0 test
> [warn]  ::
> [warn]
> [warn]  Note: Unresolved dependencies path:
> [warn]  org.apache.spark:spark-network-common_2.10:1.3.0
> ((com.typesafe.
> sbt.pom.MavenHelper) MavenHelper.scala#L76)
> [warn]+- org.apache.spark:spark-network-shuffle_2.10:1.3.0
> sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-network-comm
> on_2.10;1.3.0: configuration not public in
> org.apache.spark#spark-network-common
> _2.10;1.3.0: 'test'. It was required from
> org.apache.spark#spark-network-shuffle
> _2.10;1.3.0 test
> at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:278)
> at
> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:175)
> at
> sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:157)
> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151)
> at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:151)
> at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:128)
> at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56)
> at sbt.IvySbt$$anon$4.call(Ivy.scala:64)
> at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
> at
> xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRet
> ries$1(Locks.scala:78)
> at
> xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:
> 97)
> at xsbt.boot.Using$.withResource(Using.scala:10)
> at xsbt.boot.Using$.apply(Using.scala:9)
> at
> xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
> at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
> at xsbt.boot.Locks$.apply0(Locks.scala:31)
> at xsbt.boot.Locks$.apply(Locks.scala:28)
> at sbt.IvySbt.withDefaultLogger(Ivy.scala:64)
> at sbt.IvySbt.withIvy(Ivy.scala:123)
> at sbt.IvySbt.withIvy(Ivy.scala:120)
> at sbt.IvySbt$Module.withModule(Ivy.scala:151)
> at sbt.IvyActions$.updateEither(IvyActions.scala:157)
> at
> sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala
> :1318)
> at
> sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala
> :1315)
> at
> sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1
> 345)
> at
> sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$85.apply(Defaults.scala:1
> 343)
> at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35)
> at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1348)
> at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1342)
> at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45)
> at sbt.Classpaths$.cachedUpdate(Defaults.scala:1360)
> at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1300)
> at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1275)
> at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
> at
> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
> at sbt.std.Transform$$anon$4.work(System.scala:63)
> at
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:22
> 6)
> at
> sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:22
> 6)
> at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
> at sbt.Execute.work(Execute.scala:235)
> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
> at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
> at
> sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestric
> tions.scala:159)
> at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.Executors$

Re: Reading file from S3, facing java.lang.NoClassDefFoundError: org/jets3t/service/ServiceException

2015-06-15 Thread shahab
Thanks Akhil, it solved the problem.

best
/Shahab

On Fri, Jun 12, 2015 at 8:50 PM, Akhil Das 
wrote:

> Looks like your spark is not able to pick up the HADOOP_CONF. To fix this,
> you can actually add jets3t-0.9.0.jar to the classpath
> (sc.addJar(/path/to/jets3t-0.9.0.jar).
>
> Thanks
> Best Regards
>
> On Thu, Jun 11, 2015 at 6:44 PM, shahab  wrote:
>
>> Hi,
>>
>> I tried to read a csv file from amazon s3, but I get the following
>> exception which I have no clue how to solve this. I tried both spark 1.3.1
>> and 1.2.1, but no success.  Any idea how to solve this is appreciated.
>>
>>
>> best,
>> /Shahab
>>
>> the code:
>>
>> val hadoopConf=sc.hadoopConfiguration;
>>
>>  hadoopConf.set("fs.s3.impl",
>> "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
>>
>>  hadoopConf.set("fs.s3.awsAccessKeyId", aws_access_key_id)
>>
>>  hadoopConf.set("fs.s3.awsSecretAccessKey", aws_secret_access_key)
>>
>>  val csv = sc.textFile(""s3n://mybucket/info.csv")  // original file
>>
>>  val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines
>> in rows
>>
>>
>> Here is the exception I faced:
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/jets3t/service/ServiceException
>>
>> at org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(
>> NativeS3FileSystem.java:280)
>>
>> at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(
>> NativeS3FileSystem.java:270)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431
>> )
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>>
>> at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(
>> FileInputFormat.java:256)
>>
>> at org.apache.hadoop.mapred.FileInputFormat.listStatus(
>> FileInputFormat.java:228)
>>
>> at org.apache.hadoop.mapred.FileInputFormat.getSplits(
>> FileInputFormat.java:304)
>>
>> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> at scala.Option.getOrElse(Option.scala:120)
>>
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
>> MapPartitionsRDD.scala:32)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> at scala.Option.getOrElse(Option.scala:120)
>>
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>
>> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(
>> MapPartitionsRDD.scala:32)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> at scala.Option.getOrElse(Option.scala:120)
>>
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
>>
>> at org.apache.spark.rdd.RDD.count(RDD.scala:1006)
>>
>
>


RE: If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-15 Thread Haopu Wang
Akhil, thank you for the response. I want to explore more.

 

If the application is just monitoring a HDFS folder and output the word
count of each streaming batch into also HDFS.

 

When I kill the application _before_ spark takes a checkpoint, after
recovery, spark will resume the processing from the timestamp of latest
checkpoint. That means some files will be processed twice and duplicate
results are generated.

 

Please correct me if the understanding is wrong, thanks again!

 



From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Monday, June 15, 2015 3:48 PM
To: Haopu Wang
Cc: user
Subject: Re: If not stop StreamingContext gracefully, will checkpoint
data be consistent?

 

I think it should be fine, that's the whole point of check-pointing (in
case of driver failure etc).




Thanks

Best Regards

 

On Mon, Jun 15, 2015 at 6:54 AM, Haopu Wang  wrote:

Hi, can someone help to confirm the behavior? Thank you!


-Original Message-
From: Haopu Wang
Sent: Friday, June 12, 2015 4:57 PM
To: user
Subject: If not stop StreamingContext gracefully, will checkpoint data
be consistent?

This is a quick question about Checkpoint. The question is: if the
StreamingContext is not stopped gracefully, will the checkpoint be
consistent?
Or I should always gracefully shutdown the application even in order to
use the checkpoint?

Thank you very much!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

 



Re: How to set up a Spark Client node?

2015-06-15 Thread ayan guha
I feel he wanted to ask about workers. In that case, pplease launch workers
on Node 3,4,5 (and/or Node 8,9,10 etc).
You need to go to each worker and start worker daemon with master URL:Port
(typically7077) as parameter (so workers can talk to master).

You shoud be able to see 1 masterr and N workers in UI which typically
starts on Master URL:8080.

Once you do that,you follow Akhil's instruction above to get a sqlContexxt
and set master property properly and runyour app.
HTH

On Mon, Jun 15, 2015 at 7:02 PM, Akhil Das 
wrote:

> I'm assuming by spark-client you mean the spark driver program. In that
> case you can pick any machine (say Node 7), create your driver program in
> it and use spark-submit to submit it to the cluster or if you create the
> SparkContext within your driver program (specifying all the properties)
> then you may simply run it with sbt run.
>
> Thanks
> Best Regards
>
> On Sun, Jun 14, 2015 at 6:17 AM, MrAsanjar .  wrote:
>
>> I have following hadoop & spark cluster nodes configuration:
>> Nodes 1 & 2 are resourceManager and nameNode respectivly
>> Nodes 3, 4, and 5 each includes nodeManager & dataNode
>> Node 7 is Spark-master configured to run yarn-client or yarn-master modes
>> I have tested it and it works fine.
>> Is there any instuctions on how to setup spark client in a cluster mode?
>> I am not sure if I am doing it right.
>> Thanks in advance
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: How to use spark for map-reduce flow to filter N columns, top M rows of all csv files under a folder?

2015-06-15 Thread Akhil Das
Something like this?

val huge_data = sc.textFile("/path/to/first.csv").map(x =>
(x.split("\t")(1), x.split("\t")(0))
val gender_data = sc.textFile("/path/to/second.csv"),map(x =>
(x.split("\t")(0), x))

val joined_data = huge_data.join(gender_data)

joined_data.take(1000)


Its scala btw, python api should also be similar.

Thanks
Best Regards

On Sat, Jun 13, 2015 at 12:16 AM, Rex X  wrote:

> To be concrete, say we have a folder with thousands of tab-delimited csv
> files with following attributes format (each csv file is about 10GB):
>
> idnameaddresscity...
> 1Mattadd1LA...
> 2Willadd2LA...
> 3Lucyadd3SF...
> ...
>
> And we have a lookup table based on "name" above
>
> namegender
> MattM
> LucyF
> ...
>
> Now we are interested to output from top 1000 rows of each csv file into
> following format:
>
> idnamegender
> 1MattM
> ...
>
> Can we use pyspark to efficiently handle this?
>
>
>


RE: Optimizing Streaming from Websphere MQ

2015-06-15 Thread Chaudhary, Umesh
Hi Akhil,
Thanks for your response.
I have 10 cores which sums of all my 3 machines and I am having 5-10 receivers.
I have tried to test the processed number of records per second by varying 
number of receivers.
If I am having 10 receivers (i.e. one receiver for each core), then I am not 
experiencing any performance benefit from it.
Is it something related to the bottleneck of MQ or Reliable Receiver?

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Saturday, June 13, 2015 1:10 AM
To: Chaudhary, Umesh
Cc: user@spark.apache.org
Subject: Re: Optimizing Streaming from Websphere MQ

How many cores are you allocating for your job? And how many receivers are you 
having? It would be good if you can post your custom receiver code, it will 
help people to understand it better and shed some light.

Thanks
Best Regards

On Fri, Jun 12, 2015 at 12:58 PM, Chaudhary, Umesh 
mailto:umesh.chaudh...@searshc.com>> wrote:
Hi,
I have created a Custom Receiver in Java which receives data from Websphere MQ 
and I am only writing the received records on HDFS.

I have referred many forums for optimizing speed of spark streaming 
application. Here I am listing a few:


• Spark 
Official

• VIrdata

•  TD’s Slide (A bit Old but 
Useful)

I got mainly two point for my applicability :


• giving batch interval as 1 sec

• Controlling “spark.streaming.blockInterval” =200ms

• inputStream.repartition(3)

But that did not improve my actual speed (records/sec) of receiver which is MAX 
5-10 records /sec. This is way less from my expectation.
Am I missing something?

Regards,
Umesh Chaudhary
This message, including any attachments, is the property of Sears Holdings 
Corporation and/or one of its subsidiaries. It is confidential and may contain 
proprietary or legally privileged information. If you are not the intended 
recipient, please delete it without reading the contents. Thank you.


This message, including any attachments, is the property of Sears Holdings 
Corporation and/or one of its subsidiaries. It is confidential and may contain 
proprietary or legally privileged information. If you are not the intended 
recipient, please delete it without reading the contents. Thank you.


Re: How to set up a Spark Client node?

2015-06-15 Thread Akhil Das
I'm assuming by spark-client you mean the spark driver program. In that
case you can pick any machine (say Node 7), create your driver program in
it and use spark-submit to submit it to the cluster or if you create the
SparkContext within your driver program (specifying all the properties)
then you may simply run it with sbt run.

Thanks
Best Regards

On Sun, Jun 14, 2015 at 6:17 AM, MrAsanjar .  wrote:

> I have following hadoop & spark cluster nodes configuration:
> Nodes 1 & 2 are resourceManager and nameNode respectivly
> Nodes 3, 4, and 5 each includes nodeManager & dataNode
> Node 7 is Spark-master configured to run yarn-client or yarn-master modes
> I have tested it and it works fine.
> Is there any instuctions on how to setup spark client in a cluster mode?
> I am not sure if I am doing it right.
> Thanks in advance
>


Worker is KILLED for no reason

2015-06-15 Thread nizang
hi,

I'm using the new 1.4.0 installation, and ran a job there. The job finished
and everything seems fine. When I enter the application, I can see that the
job is marked as KILLED:

Removed Executors

ExecutorID  Worker  Cores   Memory  State   Logs
0   worker-20150615080550-172.31.11.225-51630   4   10240   KILLED  
stdout stderr

when I enter the worker itself, I can see it marked as EXITED:


ExecutorID  Cores   State   Memory  Job Details Logs
0   4   EXITED  10.0 GB 
ID: app-20150615080601-
Name: dev.app.name
User: root
stdout stderr

no interesting things in the stdout or stderr

Why is the job marked as KILLED in the application page?

this is the only job I ran, and the job that was in this executors. Also, by
checking the logs and output things seems to run fine

thanks, nizan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Worker-is-KILLED-for-no-reason-tp23314.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Join between DStream and Periodically-Changing-RDD

2015-06-15 Thread Evo Eftimov
“turn (keep turning) your HDFS file (Batch RDD) into a stream of messages 
(outside spark streaming)” – what I meant by that was “turn the Updates to your 
HDFS dataset into Messages” and send them as such to spark streaming 

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Monday, June 15, 2015 8:38 AM
To: 'Ilove Data'; 'Tathagata Das'
Cc: 'Akhil Das'; 'user'
Subject: RE: Join between DStream and Periodically-Changing-RDD

 

Then go for the second option I suggested - simply turn (keep turning) your 
HDFS file (Batch RDD) into a stream of messages (outside spark streaming) – 
then spark streaming consumes and aggregates the messages FOR THE RUNTIME 
LIFETIME of your application in some of the following ways:

 

1.   Continuous Union of DStream RDDs as you also Persist the result (so it 
doesn’t not get discarded whioch is what happens to DStream RDDs by default in 
spark streaming)   

2.   Apply one of the Window operations e.g. aggregation on the DSream RDD 
– as the window is the runtime lifetime of the app 

 

And at the same time you join the DStream RDDs of your actual Streaming Data 
with the above continuously updated DStream RDD representing your HDFS file 

 

From: Ilove Data [mailto:data4...@gmail.com] 
Sent: Monday, June 15, 2015 5:19 AM
To: Tathagata Das
Cc: Evo Eftimov; Akhil Das; user
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

@Akhil Das

Join two Dstreams might not be an option since I want to join stream with 
historical data in HDFS folder.

 

@Tagatha Das & @Evo Eftimov

Batch RDD to be reloaded is considerably huge compare to Dstream data since it 
is historical data. To be more specific, most of join from rdd stream to hdfs 
folder (~90% of rdd streams data) will hit to recent data (last 1-2 days data) 
in hdfs folder. So it is important to get the most updated data.

 

Is there a workaround for that specific case? Since RDD are not mutable, do I 
need a K-V database for this join with historical data?

 

On Fri, Jun 12, 2015 at 8:14 AM, Tathagata Das  wrote:

Another approach not mentioned is to use a function to get the RDD that is to 
be joined. Something like this.

 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd => {

  

  val rdd = getOrUpdateRDD(params...)

  

  rdd.join(kvFile)

  

  

})

The getOrUpdateRDD() function that you implement will get called every batch 
interval. And you can decide to return the same RDD or an updated RDD when you 
want to. Once updated, if the RDD is going to be used in multiple batch 
intervals, you should cache it. Furthermore, if you are going to join it, you 
should partition it by a partitioner, then cached it and make sure that the 
same partitioner is used for joining. That would be more efficient, as the RDD 
will stay partitioned in memory, minimizing the cost of join. 

 

 

On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov  wrote:

It depends on how big the Batch RDD requiring reloading is 

 

Reloading it for EVERY single DStream RDD would slow down the stream processing 
inline with the total time required to reload the Batch RDD …..

 

But if the Batch RDD is not that big then that might not be an issues 
especially in the context of the latency requirements for your streaming app

 

Another more efficient and real-time approach may be to represent your Batch 
RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark 
streaming app instance and keep joining with the actual Dstream RDDs 

 

You can feed your HDFS file into a Message Broker topic and consume it from 
there in the form of DStream RDDs which you keep aggregating over the lifetime 
of the spark streaming app instance 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Wednesday, June 10, 2015 8:36 AM
To: Ilove Data
Cc: user@spark.apache.org
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

RDD's are immutable, why not join two DStreams? 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd => {

  

  val file = ssc.sparkContext.textFile("/sigmoid/")

  val kvFile = file.map(x => (x.split(",")(0), x))

  

  rdd.join(kvFile)

  

  

})

 




Thanks

Best Regards

 

On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data  wrote:

Hi,

 

I'm trying to join DStream with interval let say 20s, join with RDD loaded from 
HDFS folder which is changing periodically, let say new file is coming to the 
folder for every 10 minutes.

 

How should it be done, considering the HDFS files in the folder is periodically 
changing/adding new files? Do RDD automatically detect changes in HDFS folder 
as RDD source and automatically reload RDD?

 

Thanks!

Rendy

 

 

 



Re: If not stop StreamingContext gracefully, will checkpoint data be consistent?

2015-06-15 Thread Akhil Das
I think it should be fine, that's the whole point of check-pointing (in
case of driver failure etc).

Thanks
Best Regards

On Mon, Jun 15, 2015 at 6:54 AM, Haopu Wang  wrote:

> Hi, can someone help to confirm the behavior? Thank you!
>
> -Original Message-
> From: Haopu Wang
> Sent: Friday, June 12, 2015 4:57 PM
> To: user
> Subject: If not stop StreamingContext gracefully, will checkpoint data
> be consistent?
>
> This is a quick question about Checkpoint. The question is: if the
> StreamingContext is not stopped gracefully, will the checkpoint be
> consistent?
> Or I should always gracefully shutdown the application even in order to
> use the checkpoint?
>
> Thank you very much!
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Join between DStream and Periodically-Changing-RDD

2015-06-15 Thread Evo Eftimov
Then go for the second option I suggested - simply turn (keep turning) your 
HDFS file (Batch RDD) into a stream of messages (outside spark streaming) – 
then spark streaming consumes and aggregates the messages FOR THE RUNTIME 
LIFETIME of your application in some of the following ways:

 

1.   Continuous Union of DStream RDDs as you also Persist the result (so it 
doesn’t not get discarded whioch is what happens to DStream RDDs by default in 
spark streaming)   

2.   Apply one of the Window operations e.g. aggregation on the DSream RDD 
– as the window is the runtime lifetime of the app 

 

And at the same time you join the DStream RDDs of your actual Streaming Data 
with the above continuously updated DStream RDD representing your HDFS file 

 

From: Ilove Data [mailto:data4...@gmail.com] 
Sent: Monday, June 15, 2015 5:19 AM
To: Tathagata Das
Cc: Evo Eftimov; Akhil Das; user
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

@Akhil Das

Join two Dstreams might not be an option since I want to join stream with 
historical data in HDFS folder.

 

@Tagatha Das & @Evo Eftimov

Batch RDD to be reloaded is considerably huge compare to Dstream data since it 
is historical data. To be more specific, most of join from rdd stream to hdfs 
folder (~90% of rdd streams data) will hit to recent data (last 1-2 days data) 
in hdfs folder. So it is important to get the most updated data.

 

Is there a workaround for that specific case? Since RDD are not mutable, do I 
need a K-V database for this join with historical data?

 

On Fri, Jun 12, 2015 at 8:14 AM, Tathagata Das  wrote:

Another approach not mentioned is to use a function to get the RDD that is to 
be joined. Something like this.

 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd => {

  

  val rdd = getOrUpdateRDD(params...)

  

  rdd.join(kvFile)

  

  

})

The getOrUpdateRDD() function that you implement will get called every batch 
interval. And you can decide to return the same RDD or an updated RDD when you 
want to. Once updated, if the RDD is going to be used in multiple batch 
intervals, you should cache it. Furthermore, if you are going to join it, you 
should partition it by a partitioner, then cached it and make sure that the 
same partitioner is used for joining. That would be more efficient, as the RDD 
will stay partitioned in memory, minimizing the cost of join. 

 

 

On Wed, Jun 10, 2015 at 9:08 AM, Evo Eftimov  wrote:

It depends on how big the Batch RDD requiring reloading is 

 

Reloading it for EVERY single DStream RDD would slow down the stream processing 
inline with the total time required to reload the Batch RDD …..

 

But if the Batch RDD is not that big then that might not be an issues 
especially in the context of the latency requirements for your streaming app

 

Another more efficient and real-time approach may be to represent your Batch 
RDD as a Dstraeam RDDs and keep aggregating them over the lifetime of the spark 
streaming app instance and keep joining with the actual Dstream RDDs 

 

You can feed your HDFS file into a Message Broker topic and consume it from 
there in the form of DStream RDDs which you keep aggregating over the lifetime 
of the spark streaming app instance 

 

From: Akhil Das [mailto:ak...@sigmoidanalytics.com] 
Sent: Wednesday, June 10, 2015 8:36 AM
To: Ilove Data
Cc: user@spark.apache.org
Subject: Re: Join between DStream and Periodically-Changing-RDD

 

RDD's are immutable, why not join two DStreams? 

 

Not sure, but you can try something like this also:

 

kvDstream.foreachRDD(rdd => {

  

  val file = ssc.sparkContext.textFile("/sigmoid/")

  val kvFile = file.map(x => (x.split(",")(0), x))

  

  rdd.join(kvFile)

  

  

})

 




Thanks

Best Regards

 

On Tue, Jun 9, 2015 at 7:37 PM, Ilove Data  wrote:

Hi,

 

I'm trying to join DStream with interval let say 20s, join with RDD loaded from 
HDFS folder which is changing periodically, let say new file is coming to the 
folder for every 10 minutes.

 

How should it be done, considering the HDFS files in the folder is periodically 
changing/adding new files? Do RDD automatically detect changes in HDFS folder 
as RDD source and automatically reload RDD?

 

Thanks!

Rendy

 

 

 



[SparkStreaming] NPE in DStreamCheckPointData.scala:125

2015-06-15 Thread Haopu Wang
I use the attached program to test checkpoint. It's quite simple.

 

When I run the program second time, it will load checkpoint data, that's
expected, however I see NPE in driver log.

 

Do you have any idea about the issue? I'm on Spark 1.4.0, thank you very
much!

 

== logs ==

 

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restoring
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435313 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435314 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435315 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435316 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435317 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435318 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO
FileInputDStream$FileInputDStreamCheckpointData: Restoring files for
time 143435319 ms - []

15/06/15 15:27:17 [THREAD ID=main] INFO FileInputDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO MappedDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO ForEachDStream: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] INFO DStreamGraph: Restored
checkpoint data

15/06/15 15:27:17 [THREAD ID=main] ERROR StreamingContext: Error
starting the context, marking it as stopped

java.io.IOException: java.lang.NullPointerException

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)

   at
org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DSt
reamCheckpointData.scala:123)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$
mcV$sp(DStream.scala:498)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)

   at
org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:493
)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessor
Impl.java:43)

   at java.lang.reflect.Method.invoke(Method.java:606)

   at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1
431)

   at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:15
47)

   at
java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:44
0)

   at
org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV
$sp(DStreamGraph.scala:181)

   at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)

   at
org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:1
76)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.jav
a:57)

   at
sun.reflect.Delega

Re: Spark DataFrame Reduce Job Took 40s for 6000 Rows

2015-06-15 Thread Akhil Das
Have a look here https://spark.apache.org/docs/latest/tuning.html

Thanks
Best Regards

On Mon, Jun 15, 2015 at 11:27 AM, Proust GZ Feng  wrote:

> Hi, Spark Experts
>
> I have played with Spark several weeks, after some time testing, a reduce
> operation of DataFrame cost 40s on a cluster with 5 datanode executors.
> And the back-end rows is about 6,000, is this a normal case? Such
> performance looks too bad because in Java a loop for 6,000 rows cause just
> several seconds
>
> I'm wondering any document I should read to make the job much more fast?
>
>
>
>
> Thanks in advance
> Proust
>