Re: Map-Side Join in Spark

2015-04-21 Thread ayan guha
If you are using a pairrdd, then you can use partition by method to provide
your partitioner
On 21 Apr 2015 15:04, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 What is re-partition ?

 On Tue, Apr 21, 2015 at 10:23 AM, ayan guha guha.a...@gmail.com wrote:

 In my understanding you need to create a key out of the data and
 repartition both datasets to achieve map side join.
 On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Can someone share their working code of Map Side join in Spark + Scala.
 (No Spark-SQL)

 The only resource i could find was this (Open in chrome with Chinese to
 english translator)

 http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



 --
 Deepak




 --
 Deepak




Re: Spark and accumulo

2015-04-21 Thread Akhil Das
You can simply use a custom inputformat (AccumuloInputFormat) with the
hadoop RDDs (sc.newApiHadoopFile etc) for that, all you need to do is to
pass the jobConfs. Here's pretty clean discussion:
http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook#answers-header

Thanks
Best Regards

On Tue, Apr 21, 2015 at 9:55 AM, madhvi madhvi.gu...@orkash.com wrote:

 Hi all,

 Is there anything to integrate spark with accumulo or make spark to
 process over accumulo data?

 Thanks
 Madhvi Gupta

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




WebUI shows poor locality when task schduling

2015-04-21 Thread eric wong
Hi,

When running a exprimental KMeans job for expriment, the Cached RDD is
original Points data.

I saw poor locality in Task details from WebUI. Almost one half of the
input of task is Network instead of Memory.

And Task with network input consumes almost the same time compare with the
task with  Hadoop(Disk) input, and twice with task(Memory input).
e.g
Task(Memory): 16s
Task(Network): 9s
Task(Hadoop): 9s


I see fectching RDD with 30MB form remote node consumes 5 seconds in
executor logs like below:

15/03/31 04:08:52 INFO CoarseGrainedExecutorBackend: Got assigned task 58
15/03/31 04:08:52 INFO Executor: Running task 15.0 in stage 1.0 (TID 58)
15/03/31 04:08:52 INFO HadoopRDD: Input split:
hdfs://master:8000/kmeans/data-Kmeans-5.3g:2013265920+134217728
15/03/31 04:08:52 INFO BlockManager: Found block rdd_3_15 locally
15/03/31 04:08:58 INFO Executor: Finished task 15.0 in stage 1.0 (TID 58).
1920 bytes result sent to driver
15/03/31 04:08:58 INFO CoarseGrainedExecutorBackend: Got assigned task 60
-Task60
15/03/31 04:08:58 INFO Executor: Running task 17.0 in stage 1.0 (TID 60)
15/03/31 04:08:58 INFO HadoopRDD: Input split:
hdfs://master:8000/kmeans/data-Kmeans-5.3g:2281701376+134217728
15/03/31 04:09:02 INFO BlockManager: Found block rdd_3_17 remotely
15/03/31 04:09:12 INFO Executor: Finished task 17.0 in stage 1.0 (TID 60).
1920 bytes result sent to driver


So
1)is that means i should use RDD with cache(MEMORY_AND_DISK) instead of
Memory only?

2)And should i expand Network capacity or turn Schduling locality parameter?


Any suggestion will be welcome.


--Env info---

Cluster: 4 worker, with 1 Cores and 2G executor memory

Spark version: 1.1.0

Network: 30MB/s

Submit shell:
bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans
--master spark://master:7077 --executor-memory 1g
lib/spark-examples-1.1.0-hadoop2.3.0.jar
hdfs://master:8000/kmeans/data-Kmeans-7g 8 1


Thanks very much and forgive for my poor English.

-- 
Wang Haihua


Re: how to make a spark cluster ?

2015-04-21 Thread Reynold Xin
Actually if you only have one machine, just use the Spark local mode.

Just download the Spark tarball, untar it, set master to local[N], where N
= number of cores. You are good to go. There is no setup of job tracker or
Hadoop.


On Mon, Apr 20, 2015 at 3:21 PM, haihar nahak harihar1...@gmail.com wrote:

 Thank you :)

 On Mon, Apr 20, 2015 at 4:46 PM, Jörn Franke jornfra...@gmail.com wrote:

 Hi, If you have just one physical machine then I would try out Docker
 instead of a full VM (would be waste of memory and CPU).

 Best regards
 Le 20 avr. 2015 00:11, hnahak harihar1...@gmail.com a écrit :

 Hi All,

 I've big physical machine with 16 CPUs , 256 GB RAM, 20 TB Hard disk. I
 just
 need to know what should be the best solution to make a spark cluster?

 If I need to process TBs of data then
 1. Only one machine, which contain driver, executor, job tracker and task
 tracker everything.
 2. create 4 VMs and each VM should consist 4 CPUs , 64 GB RAM
 3. create 8 VMs and each VM should consist 2 CPUs , 32 GB RAM each

 please give me your views/suggestions



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-spark-cluster-tp22563.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 {{{H2N}}}-(@:



Re: Understanding the build params for spark with sbt.

2015-04-21 Thread Akhil Das
With maven you could like:

mvn -Dhadoop.version=2.3.0 -DskipTests clean package -pl core


Thanks
Best Regards

On Mon, Apr 20, 2015 at 8:10 PM, Shiyao Ma i...@introo.me wrote:

 Hi.

 My usage is only about the spark core and hdfs, so no spark sql or
 mlib or other components invovled.


 I saw the hint on the
 http://spark.apache.org/docs/latest/building-spark.html, with a sample
 like:
 build/sbt -Pyarn -Phadoop-2.3 assembly. (what's the -P for?)


 Fundamentally, I'd like to let sbt only compile and package the core
 and the hadoop.

 Meanwhile, it would be appreciated if you could inform me what's the
 scala file that controls the logic of -Pyarn, so that I can dig into
 the build source and have a finer control.



 Thanks.

 --

 吾輩は猫である。ホームーページはhttp://introo.me。

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Map-Side Join in Spark

2015-04-21 Thread ๏̯͡๏
These are pair RDDs (itemId, item)  (itemId, listing).

What do you mean by re-partitioning of these RDDS ?
Now what you mean by your partitioner

Can you elaborate ?

On Tue, Apr 21, 2015 at 11:18 AM, ayan guha guha.a...@gmail.com wrote:

 If you are using a pairrdd, then you can use partition by method to
 provide your partitioner
 On 21 Apr 2015 15:04, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 What is re-partition ?

 On Tue, Apr 21, 2015 at 10:23 AM, ayan guha guha.a...@gmail.com wrote:

 In my understanding you need to create a key out of the data and
 repartition both datasets to achieve map side join.
 On 21 Apr 2015 14:10, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Can someone share their working code of Map Side join in Spark + Scala.
 (No Spark-SQL)

 The only resource i could find was this (Open in chrome with Chinese to
 english translator)

 http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/



 --
 Deepak




 --
 Deepak




-- 
Deepak


Re: meet weird exception when studying rdd caching

2015-04-21 Thread Akhil Das
It could be a similar issue as
https://issues.apache.org/jira/browse/SPARK-4300

Thanks
Best Regards

On Tue, Apr 21, 2015 at 8:09 AM, donhoff_h 165612...@qq.com wrote:

 Hi,

 I am studying the RDD Caching function and write a small program to verify
 it. I run the program in a Spark1.3.0 environment and on Yarn cluster. But
 I meet a weird exception. It isn't always generated in the log. Only
 sometimes I can see this exception. And it does not affect the output of my
 program.  Could anyone explain why this happens and how to eliminate it?

 My program and the exception is listed in the following. Thanks very much
 for the help!

 *Program*
 object TestSparkCaching01 {
  def main(args: Array[String]) {
val conf = new SparkConf()
conf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
conf.set(spark.kryo.registrationRequired,true)

 conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Array[MyClass1]]))
val inFile = hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt
val sc = new SparkContext(conf)
val rdd = sc.textFile(inFile)
rdd.cache()
rdd.map(Cache String: +_).foreach(println )
sc.stop()
  }
 }

 *Exception*
 15/04/21 09:58:25 WARN channel.DefaultChannelPipeline: An exception was
 thrown by an exception handler.
 java.util.concurrent.RejectedExecutionException: Worker has already been
 shutdown
 at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.registerTask(AbstractNioSelector.java:120)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:72)
 at
 org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.executeInIoThread(AbstractNioWorker.java:56)
 at
 org.jboss.netty.channel.socket.nio.NioWorker.executeInIoThread(NioWorker.java:36)
 at
 org.jboss.netty.channel.socket.nio.AbstractNioChannelSink.execute(AbstractNioChannelSink.java:34)
 at
 org.jboss.netty.channel.Channels.fireExceptionCaughtLater(Channels.java:496)
 at
 org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:46)
 at
 org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
 at
 org.jboss.netty.channel.Channels.disconnect(Channels.java:781)
 at
 org.jboss.netty.channel.AbstractChannel.disconnect(AbstractChannel.java:211)
 at
 akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:223)
 at
 akka.remote.transport.netty.NettyTransport$$anonfun$gracefulClose$1.apply(NettyTransport.scala:222)
 at scala.util.Success.foreach(Try.scala:205)
 at
 scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
 at
 scala.concurrent.Future$$anonfun$foreach$1.apply(Future.scala:204)
 at
 scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 15/04/21 09:58:25 INFO remote.RemoteActorRefProvider$RemotingTerminator:
 Remoting shut down.



Re: Custom paritioning of DSTream

2015-04-21 Thread Akhil Das
I think DStream.transform is the one that you are looking for.

Thanks
Best Regards

On Mon, Apr 20, 2015 at 9:42 PM, Evo Eftimov evo.efti...@isecc.com wrote:

 Is the only way to implement a custom partitioning of DStream via the
 foreach
 approach so to gain access to the actual RDDs comprising the DSTReam and
 hence their paritionBy method

 DSTReam has only a repartition method accepting only the number of
 partitions, BUT not the method of partitioning



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Custom-paritioning-of-DSTream-tp22574.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Configuring logging properties for executor

2015-04-21 Thread Michael Ryabtsev
Hi,

I would like to report on trying the first option proposed by Lan - putting
the log4j.properties file under the root of my application jar.
It doesn't look like it is working on in my case: submitting the
application to spark from the application code (not with spark-submit).
It seems that in this case the executor doesn't see the log4j.properties
that is located int he application jar and will use the default properties
file.
I can conclude it from the fact that the log is not created, and that's
what I see in the executor console:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

I've also tried to add the below config to the application code, which
didn't have any apparent influence:

sparkConf.set(spark.executor.extraJavaOptions,
-Dlog4j.configuration=log4j.properties);

On Mon, Apr 20, 2015 at 5:59 PM, Lan Jiang ljia...@gmail.com wrote:

 Rename your log4j_special.properties file as log4j.properties and place it
 under the root of your jar file, you should be fine.

 If you are using Maven to build your jar, please the log4j.properties in
 the src/main/resources folder.

 However, please note that if you have other dependency jar file in the
 classpath that contains another log4j.properties file this way, it might
 not work since the first log4j.properties file that is loaded will be used.

 You can also do spark-submit —file log4j_special.properties … ,which
 should transfer your log4j property file to the worker nodes automatically
 without you copying them manually.

 Lan


  On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com
 wrote:
 
  Hi all,
 
  I need to configure spark executor log4j.properties on a standalone
 cluster.
  It looks like placing the relevant properties file in the spark
  configuration folder and  setting the spark.executor.extraJavaOptions
 from
  my application code:
  sparkConf.set(spark.executor.extraJavaOptions,
  -Dlog4j.configuration=log4j_special.properties);
  does the work, and the executor logs are written in the required place
 and
  level. As far as I understand, it works, because the spark configuration
  folder is on the class path, and passing parameter without path works
 here.
  However, I would like to avoid deploying these properties to each worker
  spark configuration folder.
  I wonder, if I put the properties in my application jar, is there any
 way of
  telling executor to load them?
 
  Thanks,
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 




Re: Spark Scala Version?

2015-04-21 Thread Dean Wampler
Without the rest of your code it's hard to make sense of errors. Why do you
need to use reflection?

​Make sure you use the same Scala versions throughout and 2.10.4 is
recommended. That's still the official version for Spark, even though
provisional​ support for 2.11 exists.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Tue, Apr 21, 2015 at 12:26 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 While running a my Spark Application over 1.3.0 with Scala 2.10.0 i
 encountered

 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage
 2.0 (TID 28)
 java.lang.UnsupportedOperationException: tail of empty list
 at scala.collection.immutable.Nil$.tail(List.scala:339)
 at scala.collection.immutable.Nil$.tail(List.scala:334)
 at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)


 My app has quite a lot of  reflection usage as i need to build avro
 schema. I was told on stack overflow that this error is because reflection
 is not thread safe in scala 2.10 and was advised to use scala 2.11

 Hence I moved to scala 2.11 and moved all spark maven dependencies to 2.11
 (spark-avro is still 2.10 as 2.11 is not available). I ran into this

 15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage
 2.0 (TID 28)
 java.lang.UnsupportedOperationException: tail of empty list
 at scala.collection.immutable.Nil$.tail(List.scala:339)
 at scala.collection.immutable.Nil$.tail(List.scala:334)
 at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)


 Code:
 args.map {
   option =
 {
   val pairs = option.split(=)
   arguments += pairs(0) - pairs(1)
 }
 }

 Error is at -. Does Spark use scala libs 2.10 at runtime ? Any
 suggestions for fix ?
 --
 Deepak




Re: Error while running SparkPi in Hadoop HA

2015-04-21 Thread Fernando O.
Solved

Looks like it's some incompatibility in the build when using -Phadoop-2.4 ,
made the distribution with -Phadoop-provided   and that fixed the issue

On Tue, Apr 21, 2015 at 2:03 PM, Fernando O. fot...@gmail.com wrote:

 Hi all,

 I'm wondering if SparkPi works with hadoop HA (I guess it should)


 Hadoop's pi example works great on my cluster, so after having that done I 
 installed spark and in the worker log I'm seeing two problems that might be 
 related.


 Versions: Hadoop 2.6.0

   Spark 1.3.1


 I'm running :

 ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master 
 yarn-cluster --num-executors 2 --driver-memory 2g 
 --executor-memory 1g --executor-cores 1 lib/spark-examples*.jar 10


 It seems like it can't see the resource manager but I pinged and telnet it
 from the node and that works, also: hadoop's pi example works...

 So I'm not sure why spark is not seeing the RM



 15/04/21 15:56:21 INFO yarn.YarnRMClient: Registering the 
 ApplicationMaster*15/04/21 15:56:22 INFO ipc.Client: Retrying connect to 
 server: 0.0.0.0/0.0.0.0:8030 http://0.0.0.0/0.0.0.0:8030. Already tried 0 
 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, 
 sleepTime=1000 MILLISECONDS)
 15/04/21 15:56:23 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030 http://0.0.0.0/0.0.0.0:8030. Already tried 1 time(s); 
 retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, 
 sleepTime=1000 MILLISECONDS)*
 15/04/21 15:56:24 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030. Already tried 2 time(s); retry policy is 
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 
 MILLISECONDS)
 15/04/21 15:56:25 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030. Already tried 3 time(s); retry policy is 
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 
 MILLISECONDS)
 15/04/21 15:56:26 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030. Already tried 4 time(s); retry policy is 
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 
 MILLISECONDS)
 15/04/21 15:56:27 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030. Already tried 5 time(s); retry policy is 
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 
 MILLISECONDS)
 15/04/21 15:56:28 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030. Already tried 6 time(s); retry policy is 
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 
 MILLISECONDS)
 15/04/21 15:56:29 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030. Already tried 7 time(s); retry policy is 
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 
 MILLISECONDS)
 15/04/21 15:56:30 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030. Already tried 8 time(s); retry policy is 
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 
 MILLISECONDS)
 15/04/21 15:56:31 INFO ipc.Client: Retrying connect to server: 
 0.0.0.0/0.0.0.0:8030. Already tried 9 time(s); retry policy is 
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 
 MILLISECONDS)
 15/04/21 15:56:51 INFO cluster.YarnClusterSchedulerBackend: SchedulerBackend 
 is ready for scheduling beginning after waiting 
 maxRegisteredResourcesWaitingTime: 3(ms)
 15/04/21 15:56:51 INFO cluster.YarnClusterScheduler: 
 YarnClusterScheduler.postStartHook done
 15/04/21 15:56:51 INFO spark.SparkContext: Starting job: reduce at 
 SparkPi.scala:35
 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Got job 0 (reduce at 
 SparkPi.scala:35) with 10 output partitions (allowLocal=false)
 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at 
 SparkPi.scala:35)
 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Parents of final stage: List()
 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Missing parents: List()
 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Submitting Stage 0 
 (MapPartitionsRDD[1] at map at SparkPi.scala:31), which has no missing parents
 15/04/21 15:56:51 INFO cluster.YarnClusterScheduler: Cancelling stage 0
 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Stage 0 (reduce at 
 SparkPi.scala:35) failed in Unknown s
 15/04/21 15:56:51 INFO scheduler.DAGScheduler: Job 0 failed: reduce at 
 SparkPi.scala:35, took 0.121974 s*15/04/21 15:56:51 ERROR 
 yarn.ApplicationMaster: User class threw exception: Job aborted due to stage 
 failure: Task serialization failed: 
 java.lang.reflect.InvocationTargetException
 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
 

Re: SparkSQL performance

2015-04-21 Thread Michael Armbrust
Here is an example using rows directly:
https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#programmatically-specifying-the-schema

Avro or parquet input would likely give you the best performance.

On Tue, Apr 21, 2015 at 4:28 AM, Renato Marroquín Mogrovejo 
renatoj.marroq...@gmail.com wrote:

 Thanks for the hints guys! much appreciated!
 Even if I just do a something like:

 Select * from tableX where attribute1  5

 I see similar behaviour.

 @Michael
 Could you point me to any sample code that uses Spark's Rows? We are at a
 phase where we can actually change our JavaBeans for something that
 provides a better performance than what we are seeing now. Would you
 recommend using Avro presentation then?
 Thanks again!


 Renato M.

 2015-04-21 1:18 GMT+02:00 Michael Armbrust mich...@databricks.com:

 There is a cost to converting from JavaBeans to Rows and this code path
 has not been optimized.  That is likely what you are seeing.

 On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote:

 SparkSQL optimizes better by column pruning and predicate pushdown,
 primarily. Here you are not taking advantage of either.

 I am curious to know what goes in your filter function, as you are not
 using a filter in SQL side.

 Best
 Ayan
 On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com wrote:

 Does anybody have an idea? a clue? a hint?
 Thanks!


 Renato M.

 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com:

 Hi all,

 I have a simple query Select * from tableX where attribute1 between 0
 and 5 that I run over a Kryo file with four partitions that ends up being
 around 3.5 million rows in our case.
 If I run this query by doing a simple map().filter() it takes around
 ~9.6 seconds but when I apply schema, register the table into a 
 SqlContext,
 and then run the query, it takes around ~16 seconds. This is using Spark
 1.2.1 with Scala 2.10.0
 I am wondering why there is such a big gap on performance if it is
 just a filter. Internally, the relation files are mapped to a JavaBean.
 This different data presentation (JavaBeans vs SparkSQL internal
 representation) could lead to such difference? Is there anything I could 
 do
 to make the performance get closer to the hard-coded option?
 Thanks in advance for any suggestions or ideas.


 Renato M.







Spark Scala Version?

2015-04-21 Thread ๏̯͡๏
While running a my Spark Application over 1.3.0 with Scala 2.10.0 i
encountered

15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage
2.0 (TID 28)
java.lang.UnsupportedOperationException: tail of empty list
at scala.collection.immutable.Nil$.tail(List.scala:339)
at scala.collection.immutable.Nil$.tail(List.scala:334)
at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)


My app has quite a lot of  reflection usage as i need to build avro schema.
I was told on stack overflow that this error is because reflection is not
thread safe in scala 2.10 and was advised to use scala 2.11

Hence I moved to scala 2.11 and moved all spark maven dependencies to 2.11
(spark-avro is still 2.10 as 2.11 is not available). I ran into this

15/04/21 09:13:21 ERROR executor.Executor: Exception in task 7.0 in stage
2.0 (TID 28)
java.lang.UnsupportedOperationException: tail of empty list
at scala.collection.immutable.Nil$.tail(List.scala:339)
at scala.collection.immutable.Nil$.tail(List.scala:334)
at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)


Code:
args.map {
  option =
{
  val pairs = option.split(=)
  arguments += pairs(0) - pairs(1)
}
}

Error is at -. Does Spark use scala libs 2.10 at runtime ? Any suggestions
for fix ?
-- 
Deepak


Re: Updating a Column in a DataFrame

2015-04-21 Thread Reynold Xin
You can use

df.withColumn(a, df.b)

to make column a having the same value as column b.


On Mon, Apr 20, 2015 at 3:38 PM, ARose ashley.r...@telarix.com wrote:

 In my Java application, I want to update the values of a Column in a given
 DataFrame. However, I realize DataFrames are immutable, and therefore
 cannot
 be updated by conventional means. Is there a workaround for this sort of
 transformation? If so, can someone provide an example?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Updating-a-Column-in-a-DataFrame-tp22578.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark 1.3.1 Dataframe breaking ALS.train?

2015-04-21 Thread Xiangrui Meng
SchemaRDD subclasses RDD in 1.2, but DataFrame is no longer an RDD in
1.3. We should allow DataFrames in ALS.train. I will submit a patch.
You can use `ALS.train(training.rdd, ...)` for now as a workaround.
-Xiangrui

On Tue, Apr 21, 2015 at 10:51 AM, Joseph Bradley jos...@databricks.com wrote:
 Hi Ayan,

 If you want to use DataFrame, then you should use the Pipelines API
 (org.apache.spark.ml.*) which will take DataFrames:
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS

 In the examples/ directory for ml/, you can find a MovieLensALS example.

 Good luck!
 Joseph

 On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote:

 Hi

 I am getting an error

 Also, I am getting an error in mlib.ALS.train function when passing
 dataframe (do I need to convert the DF to RDD?)

 Code:
 training = ssc.sql(select userId,movieId,rating from ratings where
 partitionKey  6).cache()
 print type(training)
 model = ALS.train(training,rank,numIter,lmbda)

 Error:
 class 'pyspark.sql.dataframe.DataFrame'

 Traceback (most recent call last):
   File D:\Project\Spark\code\movie_sql.py, line 109, in module
 bestConf = getBestModel(sc,ssc,training,validation,validationNoRating)
   File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
 model = ALS.train(trainingRDD,rank,numIter,lmbda)
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 139, in train
 model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
 iterations,
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 127, in _prepare
 assert isinstance(ratings, RDD), ratings should be RDD
 AssertionError: ratings should be RDD

 It was working fine in 1.2.0 (till last night :))

 Any solution? I am thinking to map the training dataframe back to a RDD,
 byt will lose the schema information.

 Best
 Ayan

 On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote:

 Hi
 Just upgraded to Spark 1.3.1.

 I am getting an warning

 Warning (from warnings module):
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py,
 line 191
 warnings.warn(inferSchema is deprecated, please use createDataFrame
 instead)
 UserWarning: inferSchema is deprecated, please use createDataFrame
 instead

 However, documentation still says to use inferSchema.
 Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
 section

 Also, I am getting an error in mlib.ALS.train function when passing
 dataframe (do I need to convert the DF to RDD?)

 Code:
 training = ssc.sql(select userId,movieId,rating from ratings where
 partitionKey  6).cache()
 print type(training)
 model = ALS.train(training,rank,numIter,lmbda)

 Error:
 class 'pyspark.sql.dataframe.DataFrame'
 Rank:8 Lmbda:1.0 iteration:10

 Traceback (most recent call last):
   File D:\Project\Spark\code\movie_sql.py, line 109, in module
 bestConf =
 getBestModel(sc,ssc,training,validation,validationNoRating)
   File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
 model = ALS.train(trainingRDD,rank,numIter,lmbda)
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 139, in train
 model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
 iterations,
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 127, in _prepare
 assert isinstance(ratings, RDD), ratings should be RDD
 AssertionError: ratings should be RDD

 --
 Best Regards,
 Ayan Guha




 --
 Best Regards,
 Ayan Guha



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Join on DataFrames from the same source (Pyspark)

2015-04-21 Thread Michael Armbrust
This is https://issues.apache.org/jira/browse/SPARK-6231

Unfortunately this is pretty hard to fix as its hard for us to
differentiate these without aliases.  However you can add an alias as
follows:

from pyspark.sql.functions import *
df.alias(a).join(df.alias(b), col(a.col1) == col(b.col1))

On Tue, Apr 21, 2015 at 8:10 AM, Karlson ksonsp...@siberie.de wrote:

 Sorry, my code actually was

 df_one = df.select('col1', 'col2')
 df_two = df.select('col1', 'col3')

 But in Spark 1.4.0 this does not seem to make any difference anyway and
 the problem is the same with both versions.



 On 2015-04-21 17:04, ayan guha wrote:

 your code should be

  df_one = df.select('col1', 'col2')
  df_two = df.select('col1', 'col3')

 Your current code is generating a tupple, and of course df_1 and df_2 are
 different, so join is yielding to cartesian.

 Best
 Ayan

 On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote:

  Hi,

 can anyone confirm (and if so elaborate on) the following problem?

 When I join two DataFrames that originate from the same source DataFrame,
 the resulting DF will explode to a huge number of rows. A quick example:

 I load a DataFrame with n rows from disk:

 df = sql_context.parquetFile('data.parquet')

 Then I create two DataFrames from that source.

 df_one = df.select(['col1', 'col2'])
 df_two = df.select(['col1', 'col3'])

 Finally I want to (inner) join them back together:

 df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'],
 'inner')

 The key in col1 is unique. The resulting DataFrame should have n rows,
 however it does have n*n rows.

 That does not happen, when I load df_one and df_two from disk directly. I
 am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Column renaming after DataFrame.groupBy

2015-04-21 Thread Reynold Xin
You can use the more verbose syntax:

d.groupBy(_1).agg(d(_1), sum(_1).as(sum_1), sum(_2).as(sum_2))

On Tue, Apr 21, 2015 at 1:06 AM, Justin Yip yipjus...@prediction.io wrote:

 Hello,

 I would like rename a column after aggregation. In the following code, the
 column name is SUM(_1#179), is there a way to rename it to a more
 friendly name?

 scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10)))
 scala d.groupBy(_1).sum().printSchema
 root
  |-- _1: integer (nullable = false)
  |-- SUM(_1#179): long (nullable = true)
  |-- SUM(_2#180): long (nullable = true)

 Thanks.

 Justin

 --
 View this message in context: Column renaming after DataFrame.groupBy
 http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Spark 1.3.1 Dataframe breaking ALS.train?

2015-04-21 Thread Joseph Bradley
Hi Ayan,

If you want to use DataFrame, then you should use the Pipelines API
(org.apache.spark.ml.*) which will take DataFrames:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS

In the examples/ directory for ml/, you can find a MovieLensALS example.

Good luck!
Joseph

On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote:

 Hi

 I am getting an error

 Also, I am getting an error in mlib.ALS.train function when passing
 dataframe (do I need to convert the DF to RDD?)

 Code:
 training = ssc.sql(select userId,movieId,rating from ratings where
 partitionKey  6).cache()
 print type(training)
 model = ALS.train(training,rank,numIter,lmbda)

 Error:
 class 'pyspark.sql.dataframe.DataFrame'

 Traceback (most recent call last):
   File D:\Project\Spark\code\movie_sql.py, line 109, in module
 bestConf = getBestModel(sc,ssc,training,validation,validationNoRating)
   File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
 model = ALS.train(trainingRDD,rank,numIter,lmbda)
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 139, in train
 model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
 iterations,
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 127, in _prepare
 assert isinstance(ratings, RDD), ratings should be RDD
 AssertionError: ratings should be RDD

 It was working fine in 1.2.0 (till last night :))

 Any solution? I am thinking to map the training dataframe back to a RDD,
 byt will lose the schema information.

 Best
 Ayan

 On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote:

 Hi
 Just upgraded to Spark 1.3.1.

 I am getting an warning

 Warning (from warnings module):
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py,
 line 191
 warnings.warn(inferSchema is deprecated, please use createDataFrame
 instead)
 UserWarning: inferSchema is deprecated, please use createDataFrame instead

 However, documentation still says to use inferSchema.
 Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
 section

 Also, I am getting an error in mlib.ALS.train function when passing
 dataframe (do I need to convert the DF to RDD?)

 Code:
 training = ssc.sql(select userId,movieId,rating from ratings where
 partitionKey  6).cache()
 print type(training)
 model = ALS.train(training,rank,numIter,lmbda)

 Error:
 class 'pyspark.sql.dataframe.DataFrame'
 Rank:8 Lmbda:1.0 iteration:10

 Traceback (most recent call last):
   File D:\Project\Spark\code\movie_sql.py, line 109, in module
 bestConf = getBestModel(sc,ssc,training,validation,validationNoRating)
   File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
 model = ALS.train(trainingRDD,rank,numIter,lmbda)
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 139, in train
 model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
 iterations,
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 127, in _prepare
 assert isinstance(ratings, RDD), ratings should be RDD
 AssertionError: ratings should be RDD

 --
 Best Regards,
 Ayan Guha




 --
 Best Regards,
 Ayan Guha



Re: Instantiating/starting Spark jobs programmatically

2015-04-21 Thread Steve Loughran

On 21 Apr 2015, at 17:34, Richard Marscher 
rmarsc...@localytics.commailto:rmarsc...@localytics.com wrote:

- There are System.exit calls built into Spark as of now that could kill your 
running JVM. We have shadowed some of the most offensive bits within our own 
application to work around this. You'd likely want to do that or to do your own 
Spark fork. For example, if the SparkContext can't connect to your cluster 
master node when it is created, it will System.exit.

people can block errant System.exit calls by running under a SecurityManager. 
Less than ideal (and there's a small performance hit) -but possible


Re: Spark Unit Testing

2015-04-21 Thread James King
Hi Emre, thanks for the help will have a look. Cheers!

On Tue, Apr 21, 2015 at 1:46 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello James,

 Did you check the following resources:

  -
 https://github.com/apache/spark/tree/master/streaming/src/test/java/org/apache/spark/streaming

  -
 http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs

 --
 Emre Sevinç
 http://www.bigindustries.be/


 On Tue, Apr 21, 2015 at 1:26 PM, James King jakwebin...@gmail.com wrote:

 I'm trying to write some unit tests for my spark code.

 I need to pass a JavaPairDStreamString, String to my spark class.

 Is there a way to create a JavaPairDStream using Java API?

 Also is there a good resource that covers an approach (or approaches) for
 unit testing using Java.

 Regards
 jk




 --
 Emre Sevinc



Column renaming after DataFrame.groupBy

2015-04-21 Thread Justin Yip
Hello,

I would like rename a column after aggregation. In the following code, the
column name is SUM(_1#179), is there a way to rename it to a more
friendly name?

scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10)))
scala d.groupBy(_1).sum().printSchema
root
 |-- _1: integer (nullable = false)
 |-- SUM(_1#179): long (nullable = true)
 |-- SUM(_2#180): long (nullable = true)

Thanks.

Justin




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Compression and Hive with Spark 1.3

2015-04-21 Thread Ophir Cohen
Some more info:
I'm putting the compressions values on hive-site.xml and running spark job.
hc.sql(set ) returns the expected (compression) configuration but
looking at the logs, it create the tables without compression:
15/04/21 13:14:19 INFO metastore.HiveMetaStore: 0: create_table:
Table(tableName:core_secm_instr_21042015_131411_tmp, dbName:default,
owner:hadoop, createTime:1429622059, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring,
comment:from deserializer)], location:null,
inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
parameters:{serialization.format=1, path=hdfs://
10.166.157.97:9000/user/hive/warehouse/core_secm_instr_21042015_131411_tmp}),
bucketCols:[], sortCols:[], parameters:{},
skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
skewedColValueLocationMaps:{})), partitionKeys:[],
parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:sid,type:integer,nullable:true,metadata:{}},{name:typeid,type:integer,nullable:true,metadata:{}},{name:symbol,type:string,nullable:true,metadata:{}},{name:name,type:string,nullable:true,metadata:{}},{name:beginDT,type:long,nullable:true,metadata:{}},{name:endDT,type:long,nullable:true,metadata:{}}]},
EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
spark.sql.sources.provider=org.apache.spark.sql.parquet},
viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
15/04/21 13:14:19 INFO HiveMetaStore.audit: ugi=hadoop
ip=unknown-ip-addr  cmd=create_table:
Table(tableName:core_secm_instr_21042015_131411_tmp, dbName:default,
owner:hadoop, createTime:1429622059, lastAccessTime:0, retention:0,
sd:StorageDescriptor(cols:[FieldSchema(name:col, type:arraystring,
comment:from deserializer)], location:null,
inputFormat:org.apache.hadoop.mapred.SequenceFileInputFormat,
outputFormat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat,
compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null,
serializationLib:org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe,
parameters:{serialization.format=1, path=hdfs://
10.166.157.97:9000/user/hive/warehouse/core_secm_instr_21042015_131411_tmp}),
bucketCols:[], sortCols:[], parameters:{},
skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
skewedColValueLocationMaps:{})), partitionKeys:[],
parameters:{spark.sql.sources.schema.part.0={type:struct,fields:[{name:sid,type:integer,nullable:true,metadata:{}},{name:typeid,type:integer,nullable:true,metadata:{}},{name:symbol,type:string,nullable:true,metadata:{}},{name:name,type:string,nullable:true,metadata:{}},{name:beginDT,type:long,nullable:true,metadata:{}},{name:endDT,type:long,nullable:true,metadata:{}}]},
EXTERNAL=FALSE, spark.sql.sources.schema.numParts=1,
spark.sql.sources.provider=org.apache.spark.sql.parquet},
viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)



On Tue, Apr 21, 2015 at 12:40 PM, Ophir Cohen oph...@gmail.com wrote:

 Sadly I'm encounter too many issues migrating my code to Spark 1.3

 I wrote one problem on other mail but my main problem is that I can't set
 the right compression type.
 In Spark 1.2.1 setting the following values was enough:
 hc.setConf(hive.exec.compress.output, true)
 hc.setConf(mapreduce.output.fileoutputformat.compress.codec,
 org.apache.hadoop.io.compress.SnappyCodec)
 hc.setConf(mapreduce.output.fileoutputformat.compress.type, BLOCK)

 Running it the new cluster I:
 1. Get the files uncompressed and named *.parquet
 2. When trying to explore it using Hive CLI I get the follwing excpetion:


 *Failed with exception java.io.IOException:java.io.IOException:
 hdfs://10.166.157.97:9000/user/hive/warehouse/core_equity_corp_splits_divs/part-r-1.parquet
 http://10.166.157.97:9000/user/hive/warehouse/core_equity_corp_splits_divs/part-r-1.parquet
 not a SequenceFileTime taken: 0.538 seconds*
 3. Running from Spark shell the same query yield empty results.

 Please advise





Re: Shuffle files not cleaned up (Spark 1.2.1)

2015-04-21 Thread N B
We already do have a cron job in place to clean just the shuffle files.
However, what I would really like to know is whether there is a proper
way of telling spark to clean up these files once its done with them?

Thanks
NB


On Mon, Apr 20, 2015 at 10:47 AM, Jeetendra Gangele gangele...@gmail.com
wrote:

 Write a crone job for this like below

 12 * * * *  find $SPARK_HOME/work -cmin +1440 -prune -exec rm -rf {} \+
 32 * * * *  find /tmp -type d -cmin +1440 -name spark-*-*-* -prune -exec
 rm -rf {} \+
 52 * * * *  find $SPARK_LOCAL_DIR -mindepth 1 -maxdepth 1 -type d -cmin
 +1440 -name spark-*-*-* -prune -exec rm -rf {} \+


 On 20 April 2015 at 23:12, N B nb.nos...@gmail.com wrote:

 Hi all,

 I had posed this query as part of a different thread but did not get a
 response there. So creating a new thread hoping to catch someone's
 attention.

 We are experiencing this issue of shuffle files being left behind and not
 being cleaned up by Spark. Since this is a Spark streaming application, it
 is expected to stay up indefinitely, so shuffle files not being cleaned up
 is a big problem right now. Our max window size is 6 hours, so we have set
 up a cron job to clean up shuffle files older than 12 hours otherwise it
 will eat up all our disk space.

 Please see the following. It seems the non-cleaning of shuffle files is
 being documented in 1.3.1.

 https://github.com/apache/spark/pull/5074/files
 https://issues.apache.org/jira/browse/SPARK-5836


 Also, for some reason, the following JIRAs that were reported as
 functional issues were closed as Duplicates of the above Documentation bug.
 Does this mean that this issue won't be tackled at all?

 https://issues.apache.org/jira/browse/SPARK-3563
 https://issues.apache.org/jira/browse/SPARK-4796
 https://issues.apache.org/jira/browse/SPARK-6011

 Any further insight into whether this is being looked into and meanwhile
 how to handle shuffle files will be greatly appreciated.

 Thanks
 NB








Re: Running spark over HDFS

2015-04-21 Thread madhvi

On Tuesday 21 April 2015 12:12 PM, Akhil Das wrote:

Your spark master should be spark://swetha:7077 :)

Thanks
Best Regards

On Mon, Apr 20, 2015 at 2:44 PM, madhvi madhvi.gu...@orkash.com 
mailto:madhvi.gu...@orkash.com wrote:


PFA screenshot of my cluster UI

Thanks
On Monday 20 April 2015 02:27 PM, Akhil Das wrote:

Are you seeing your task being submitted to the UI? Under
completed or running tasks? How much resources are you allocating
for your job? Can you share a screenshot of your cluster UI and
the code snippet that you are trying to run?

Thanks
Best Regards

On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com
mailto:madhvi.gu...@orkash.com wrote:

Hi,

I Did the same you told but now it is giving the following error:
ERROR TaskSchedulerImpl: Exiting due to error from cluster
scheduler: All masters are unresponsive! Giving up.

On UI it is showing that master is working

Thanks
Madhvi

On Monday 20 April 2015 12:28 PM, Akhil Das wrote:

In your eclipse, while you create your SparkContext, set the
master uri as shown in the web UI's top left corner like:
spark://someIPorHost:7077 and it should be fine.

Thanks
Best Regards

On Mon, Apr 20, 2015 at 12:22 PM, madhvi
madhvi.gu...@orkash.com mailto:madhvi.gu...@orkash.com
wrote:

Hi All,

I am new to spark and have installed spark cluster over
my system having hadoop cluster.I want to process data
stored in HDFS through spark.

When I am running code in eclipse it is giving the
following warning repeatedly:
scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure
that workers are registered and have sufficient resources.

I have made changes to spark-env.sh file as below
export SPARK_WORKER_INSTANCES=1
export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop
export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_CORES=2
export SPARK_EXECUTOR_MEMORY=1g

I am running the spark standalone cluster.In cluster UI
it is showing all workers with allocated resources but
still its not working.
what other configurations are needed to be changed?

Thanks
Madhvi Gupta


-
To unsubscribe, e-mail:
user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail:
user-h...@spark.apache.org
mailto:user-h...@spark.apache.org









-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org



Thanks Akhil,

It worked fine after replacing IP with the hostname and running the code 
by making jar of it by spark submit


Madhvi


Re: Parquet Hive table become very slow on 1.3?

2015-04-21 Thread Rex Xiong
We have the similar issue with massive parquet files, Cheng Lian, could you
have a look?

2015-04-08 15:47 GMT+08:00 Zheng, Xudong dong...@gmail.com:

 Hi Cheng,

 I tried both these patches, and seems still not resolve my issue. And I
 found the most time is spend on this line in newParquet.scala:

 ParquetFileReader.readAllFootersInParallel(
   sparkContext.hadoopConfiguration, seqAsJavaList(leaves),
 taskSideMetaData)

 Which need read all the files under the Parquet folder, while our Parquet
 folder has a lot of Parquet files (near 2000), read one file need about 2
 seconds, so it become very slow ... And the PR 5231 did not skip this steps
 so it not resolve my issue.

 As our Parquet files are generated by a Spark job, so the number of
 .parquet files is same with the number of tasks, that is why we have so
 many files. But these files actually have the same schema. Is there any way
 to merge these files into one, or avoid scan each of them?

 On Sat, Apr 4, 2015 at 9:47 PM, Cheng Lian lian.cs@gmail.com wrote:

  Hey Xudong,

 We had been digging this issue for a while, and believe PR 5339
 http://github.com/apache/spark/pull/5339 and PR 5334
 http://github.com/apache/spark/pull/5339 should fix this issue.

 There two problems:

 1. Normally we cache Parquet table metadata for better performance, but
 when converting Hive metastore Hive tables, the cache is not used. Thus
 heavy operations like schema discovery is done every time a metastore
 Parquet table is converted.
 2. With Parquet task side metadata reading (which is turned on by
 default), we can actually skip the row group information in the footer.
 However, we accidentally called a Parquet function which doesn't skip row
 group information.

 For your question about schema merging, Parquet allows different
 part-files have different but compatible schemas. For example,
 part-1.parquet has columns a and b, while part-2.parquet may has
 columns a and c. In some cases, the summary files (_metadata and
 _common_metadata) contains the merged schema (a, b, and c), but it's not
 guaranteed. For example, when the user defined metadata stored different
 part-files contain different values for the same key, Parquet simply gives
 up writing summary files. That's why all part-files must be touched to get
 a precise merged schema.

 However, in scenarios where a centralized arbitrative schema is available
 (e.g. Hive metastore schema, or the schema provided by user via data source
 DDL), we don't need to do schema merging on driver side, but defer it to
 executor side and each task only needs to reconcile those part-files it
 needs to touch. This is also what the Parquet developers did recently for
 parquet-hadoop https://github.com/apache/incubator-parquet-mr/pull/45.

 Cheng


 On 3/31/15 11:49 PM, Zheng, Xudong wrote:

 Thanks Cheng!

  Set 'spark.sql.parquet.useDataSourceApi' to false resolves my issues,
 but the PR 5231 seems not. Not sure any other things I did wrong ...

  BTW, actually, we are very interested in the schema merging feature in
 Spark 1.3, so both these two solution will disable this feature, right? It
 seems that Parquet metadata is store in a file named _metadata in the
 Parquet file folder (each folder is a partition as we use partition table),
 why we need scan all Parquet part files? Is there any other solutions could
 keep schema merging feature at the same time? We are really like this
 feature :)

 On Tue, Mar 31, 2015 at 3:19 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hi Xudong,

 This is probably because of Parquet schema merging is turned on by
 default. This is generally useful for Parquet files with different but
 compatible schemas. But it needs to read metadata from all Parquet
 part-files. This can be problematic when reading Parquet files with lots of
 part-files, especially when the user doesn't need schema merging.

 This issue is tracked by SPARK-6575, and here is a PR for it:
 https://github.com/apache/spark/pull/5231. This PR adds a configuration
 to disable schema merging by default when doing Hive metastore Parquet
 table conversion.

 Another workaround is to fallback to the old Parquet code by setting
 spark.sql.parquet.useDataSourceApi to false.

 Cheng


 On 3/31/15 2:47 PM, Zheng, Xudong wrote:

 Hi all,

  We are using Parquet Hive table, and we are upgrading to Spark 1.3.
 But we find that, just a simple COUNT(*) query will much slower (100x) than
 Spark 1.2.

  I find the most time spent on driver to get HDFS blocks. I find large
 amount of get below logs printed:

  15/03/30 23:03:43 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 
 2097ms
 15/03/30 23:03:43 DEBUG DFSClient: newInfo = LocatedBlocks{
   fileLength=77153436
   underConstruction=false
   
 blocks=[LocatedBlock{BP-1236294426-10.152.90.181-1425290838173:blk_1075187948_1448275;
  getBlockSize()=77153436; corrupt=false; offset=0; 
 locs=[10.152.116.172:50010, 10.152.116.169:50010, 10.153.125.184:50010]}]
   
 

Features scaling

2015-04-21 Thread Denys Kozyr
Hi!

I want to normalize features before train logistic regression. I setup scaler:

scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)

and apply it to a dataset:

scaledData = dataset.map(lambda x: LabeledPoint(x.label,
scaler2.transform(Vectors.dense(x.features.toArray() 

but I can't work with scaledData (can't output it or train regression
on it), got an error:

Exception: It appears that you are attempting to reference SparkContext from a b
roadcast variable, action, or transforamtion. SparkContext can only be used on t
he driver, not in code that it run on workers. For more information, see SPARK-5
063.

Does it correct code to make normalization? Why it doesn't work?
Any advices are welcome.
Thanks.

Full code:
https://gist.github.com/dkozyr/d31551a3ebed0ee17772

Console output:
https://gist.github.com/dkozyr/199f0d4f44cf522f9453

Denys

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can't get SparkListener to work

2015-04-21 Thread Shixiong Zhu
You need to call sc.stop() to wait for the notifications to be processed.

Best Regards,
Shixiong(Ryan) Zhu

2015-04-21 4:18 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com:

 Thanks Shixiong. I tried it out and it works.

 If you're looking at this post, here a few points you may be interested in:

 Turns out this is has to do with the difference between methods and
 function in scala - something I didn't pay attention to before. If you're
 looking at this thread, this may be an interesting post:

 http://jim-mcbeath.blogspot.com/2009/05/scala-functions-vs-methods.html

 Below is some test code. I added the Thread.sleep because it looks like
 Spark notifications happen asynchronously and the main/driver thread wont
 wait for the notifications to be complete. I'll look at that further later,
 but for now that's my inference, so don't take my word for it yet. Here's
 the code:

 object TestME {
   def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName(testme);
 val sc = new SparkContext(conf);
 try {
   foo(sc);
 } finally {
   Thread.sleep(2000);
 }
   }

   def foo(sc: SparkContext) = {
 sc.addSparkListener(new SparkListener() {
   override def onTaskStart(e: SparkListenerTaskStart) = println(
 onTaskStart);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
 });

  sc.parallelize(List(1, 2, 3)).map(i = throw new
 SparkException(test)).collect();
   }
 }

 I'm running it from Eclipse on local[*].



 On Sun, Apr 19, 2015 at 7:57 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks Shixiong. I'll try this.

 On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu zsxw...@gmail.com wrote:

 The problem is the code you use to test:


 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 is like the following example:

 def foo: Int = Nothing = {
   throw new SparkException(test)
 }
 sc.parallelize(List(1, 2, 3)).map(foo).collect();

 So actually the Spark jobs do not be submitted since it fails in `foo`
 that is used to create the map function.

 Change it to

 sc.parallelize(List(1, 2, 3)).map(i = throw new
 SparkException(test)).collect();

 And you will see the correct messages from your listener.



 Best Regards,
 Shixiong(Ryan) Zhu

 2015-04-19 1:06 GMT+08:00 Praveen Balaji 
 secondorderpolynom...@gmail.com:

 Thanks for the response, Archit. I get callbacks when I do not throw an
 exception from map.
 My use case, however, is to get callbacks for exceptions in
 transformations on executors. Do you think I'm going down the right route?

 Cheers
 -p

 On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur 
 archit279tha...@gmail.com wrote:

 Hi Praveen,
 Can you try once removing throw exception in map. Do you still not get
 it.?
 On Apr 18, 2015 8:14 AM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks for the response, Imran. I probably chose the wrong methods
 for this email. I implemented all methods of SparkListener and the only
 callback I get is onExecutorMetricsUpdate.

 Here's the complete code:

 ==

 import org.apache.spark.scheduler._

 sc.addSparkListener(new SparkListener() {
   override def onStageCompleted(e: SparkListenerStageCompleted) =
 println( onStageCompleted);
   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
 println( onStageSubmitted);
   override def onTaskStart(e: SparkListenerTaskStart) =
 println( onTaskStart);
   override def onTaskGettingResult(e:
 SparkListenerTaskGettingResult) = println( onTaskGettingResult);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
   override def onJobStart(e: SparkListenerJobStart) =
 println( onJobStart);
   override def onJobEnd(e: SparkListenerJobEnd) = println(
 onJobEnd);
   override def onEnvironmentUpdate(e:
 SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate);
   override def onBlockManagerAdded(e:
 SparkListenerBlockManagerAdded) = println( onBlockManagerAdded);
   override def onBlockManagerRemoved(e:
 SparkListenerBlockManagerRemoved) = println( 
 onBlockManagerRemoved);
   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
 println( onUnpersistRDD);
   override def onApplicationStart(e:
 SparkListenerApplicationStart) = println( onApplicationStart);
   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
 println( onApplicationEnd);
   override def onExecutorMetricsUpdate(e:
 SparkListenerExecutorMetricsUpdate) = println(
 onExecutorMetricsUpdate);
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 =

 On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com
 wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd 
 instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com 

Re: Spark and accumulo

2015-04-21 Thread andy petrella
Hello Madvi,

Some work has been done by @pomadchin using the spark notebook, maybe you
should come on https://gitter.im/andypetrella/spark-notebook and poke him?
There are some discoveries he made that might be helpful to know.

Also you can poke @lossyrob from Azavea, he did that for geotrellis

my0.2c
andy


On Tue, Apr 21, 2015 at 9:25 AM Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can simply use a custom inputformat (AccumuloInputFormat) with the
 hadoop RDDs (sc.newApiHadoopFile etc) for that, all you need to do is to
 pass the jobConfs. Here's pretty clean discussion:
 http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook#answers-header

 Thanks
 Best Regards

 On Tue, Apr 21, 2015 at 9:55 AM, madhvi madhvi.gu...@orkash.com wrote:

 Hi all,

 Is there anything to integrate spark with accumulo or make spark to
 process over accumulo data?

 Thanks
 Madhvi Gupta

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Different behavioural of HiveContext vs. Hive?

2015-04-21 Thread Ophir Cohen
Lately we upgraded our Spark to 1.3.
Not surprisingly, over the way I find few incomputability between the
versions and quite expected.
I found change that I'm interesting to understand it origin.
env: Amazon EMR, Spark 1.3, Hive 0.13, Hadoop 2.4

In Spark 1.2.1 I ran from the code query such:
SHOW TABLES LIKE '*tmp*'
The query gave me results from the code and from the Hive CLI.

After upgrading to Spark 1.3, from Hive CLI it still works but I get the
below exception from Spark.
Changing to other formats such as:
SHOW TABLES *.tmp
or
SHOW TABLES in default *.tmp
Yield with the same exception...
Any ideas?








*hc.sql(SHOW TABLES LIKE '*tmp*')java.lang.RuntimeException: [1.13]
failure: ``in'' expected but identifier LIKE foundSHOW TABLES LIKE
'*tmp*'^at scala.sys.package$.error(package.scala:27)at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:40)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala*


Re: Running spark over HDFS

2015-04-21 Thread Akhil Das
Your spark master should be spark://swetha:7077 :)

Thanks
Best Regards

On Mon, Apr 20, 2015 at 2:44 PM, madhvi madhvi.gu...@orkash.com wrote:

  PFA screenshot of my cluster UI

 Thanks
 On Monday 20 April 2015 02:27 PM, Akhil Das wrote:

  Are you seeing your task being submitted to the UI? Under completed or
 running tasks? How much resources are you allocating for your job? Can you
 share a screenshot of your cluster UI and the code snippet that you are
 trying to run?

  Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 12:37 PM, madhvi madhvi.gu...@orkash.com wrote:

  Hi,

 I Did the same you told but now it is giving the following error:
 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All
 masters are unresponsive! Giving up.

 On UI it is showing that master is working

 Thanks
 Madhvi

 On Monday 20 April 2015 12:28 PM, Akhil Das wrote:

  In your eclipse, while you create your SparkContext, set the master uri
 as shown in the web UI's top left corner like: spark://someIPorHost:7077
 and it should be fine.

  Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 12:22 PM, madhvi madhvi.gu...@orkash.com wrote:

 Hi All,

 I am new to spark and have installed spark cluster over my system having
 hadoop cluster.I want to process data stored in HDFS through spark.

 When I am running code in eclipse it is giving the following warning
 repeatedly:
 scheduler.TaskSchedulerImpl: Initial job has not accepted any resources;
 check your cluster UI to ensure that workers are registered and have
 sufficient resources.

 I have made changes to spark-env.sh file as below
 export SPARK_WORKER_INSTANCES=1
 export HADOOP_CONF_DIR=/root/Documents/hadoop/etc/hadoop
 export SPARK_WORKER_MEMORY=1g
 export SPARK_WORKER_CORES=2
 export SPARK_EXECUTOR_MEMORY=1g

 I am running the spark standalone cluster.In cluster UI it is showing
 all workers with allocated resources but still its not working.
 what other configurations are needed to be changed?

 Thanks
 Madhvi Gupta

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Different behavioural of HiveContext vs. Hive?

2015-04-21 Thread Ophir Cohen
BTW
This:
hc.sql(show tables).collect

Works great!

On Tue, Apr 21, 2015 at 10:49 AM, Ophir Cohen oph...@gmail.com wrote:

 Lately we upgraded our Spark to 1.3.
 Not surprisingly, over the way I find few incomputability between the
 versions and quite expected.
 I found change that I'm interesting to understand it origin.
 env: Amazon EMR, Spark 1.3, Hive 0.13, Hadoop 2.4

 In Spark 1.2.1 I ran from the code query such:
 SHOW TABLES LIKE '*tmp*'
 The query gave me results from the code and from the Hive CLI.

 After upgrading to Spark 1.3, from Hive CLI it still works but I get the
 below exception from Spark.
 Changing to other formats such as:
 SHOW TABLES *.tmp
 or
 SHOW TABLES in default *.tmp
 Yield with the same exception...
 Any ideas?








 *hc.sql(SHOW TABLES LIKE '*tmp*')java.lang.RuntimeException: [1.13]
 failure: ``in'' expected but identifier LIKE foundSHOW TABLES LIKE
 '*tmp*'^at scala.sys.package$.error(package.scala:27)at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:40)
 at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala*



Re: MLlib - Collaborative Filtering - trainImplicit task size

2015-04-21 Thread Sean Owen
I think maybe you need more partitions in your input, which might make
for smaller tasks?

On Tue, Apr 21, 2015 at 2:56 AM, Christian S. Perone
christian.per...@gmail.com wrote:
 I keep seeing these warnings when using trainImplicit:

 WARN TaskSetManager: Stage 246 contains a task of very large size (208 KB).
 The maximum recommended task size is 100 KB.

 And then the task size starts to increase. Is this a known issue ?

 Thanks !

 --
 Blog | Github | Twitter
 Forgive, O Lord, my little jokes on Thee, and I'll forgive Thy great big
 joke on me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Cassandra Connection Issue with Spark-jobserver

2015-04-21 Thread Anand
*I am new to Spark world and Job Server

My Code :*

package spark.jobserver

import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.Map

import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try

object CassandraCQLTest extends SparkJob{

  def main(args: Array[String]) {   
val sc = new SparkContext(local[4], CassandraCQLTest)
   
sc.addJar(/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar);
val config = ConfigFactory.parseString()
val results = runJob(sc, config)
println(Result is  + test)
  }
  
  override def validate(sc: SparkContext, config: Config):
SparkJobValidation = {
Try(config.getString(input.string))
  .map(x = SparkJobValid)
  .getOrElse(SparkJobInvalid(No input.string config param))
  }
  
  override def runJob(sc: SparkContext, config: Config): Any = {
val cHost: String = localhost
val cPort: String = 9160
val KeySpace = retail
val InputColumnFamily = ordercf
val OutputColumnFamily = salecount

val job = new Job()
job.setInputFormatClass(classOf[CqlPagingInputFormat])
ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace,
InputColumnFamily)
ConfigHelper.setInputPartitioner(job.getConfiguration(),
Murmur3Partitioner)
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), 3)

/** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),
user_id='bob') */

/** An UPDATE writes one or more columns to a record in a Cassandra
column family */
val query = UPDATE  + KeySpace + . + OutputColumnFamily +  SET
sale_count = ? 
CqlConfigHelper.setOutputCql(job.getConfiguration(), query)

job.setOutputFormatClass(classOf[CqlOutputFormat])
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace,
OutputColumnFamily)
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
ConfigHelper.setOutputPartitioner(job.getConfiguration(),
Murmur3Partitioner)

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
  classOf[CqlPagingInputFormat],
  classOf[java.util.Map[String,ByteBuffer]],
  classOf[java.util.Map[String,ByteBuffer]])


val productSaleRDD = casRdd.map {
  case (key, value) = {
(ByteBufferUtil.string(value.get(prod_id)),
ByteBufferUtil.toInt(value.get(quantity)))
  }
}
val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
aggregatedRDD.collect().foreach {
  case (productId, saleCount) = println(productId + : + saleCount)
}

val casoutputCF  = aggregatedRDD.map {
  case (productId, saleCount) = {
val outColFamKey = Map(prod_id - ByteBufferUtil.bytes(productId))
val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
var outColFamVal = new ListBuffer[ByteBuffer]
outColFamVal += ByteBufferUtil.bytes(saleCount)
val outVal: java.util.List[ByteBuffer] = outColFamVal
   (outKey, outVal)
  }
}

casoutputCF.saveAsNewAPIHadoopFile(
KeySpace,
classOf[java.util.Map[String, ByteBuffer]],
classOf[java.util.List[ByteBuffer]],
classOf[CqlOutputFormat],
job.getConfiguration()
  )
casRdd.count
  }
}

*When I push the Jar using spark-jobServer and execute it I get this on
spark-jobserver terminal
*
job-server[ERROR] Exception in thread pool-1-thread-1
java.lang.NoClassDefFoundError:
org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
job-server[ERROR]   at
spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46)
job-server[ERROR]   at
spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21)
job-server[ERROR]   at
spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235)
job-server[ERROR]   at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
job-server[ERROR]   at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
job-server[ERROR]   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
job-server[ERROR]   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
job-server[ERROR]   at 

Re: Custom Partitioning Spark

2015-04-21 Thread MUHAMMAD AAMIR
Hi Archit,

Thanks a lot for your reply. I am using rdd.partitions.length to check
the number of partitions. rdd.partitions return the array of partitions.
I would like to add one more question here do you have any idea how to get
the objects in each partition ? Further is there any way to figure out
which particular partitions an object bleongs ?

Thanks,

On Tue, Apr 21, 2015 at 12:16 PM, Archit Thakur archit279tha...@gmail.com
wrote:

 Hi,

 This should work. How are you checking the no. of partitions.?

 Thanks and Regards,
 Archit Thakur.

 On Mon, Apr 20, 2015 at 7:26 PM, mas mas.ha...@gmail.com wrote:

 Hi,

 I aim to do custom partitioning on a text file. I first convert it into
 pairRDD and then try to use my custom partitioner. However, somehow it is
 not working. My code snippet is given below.

 val file=sc.textFile(filePath)
 val locLines=file.map(line = line.split(\t)).map(line=
 ((line(2).toDouble,line(3).toDouble),line(5).toLong))
 val ck=locLines.partitionBy(new HashPartitioner(50)) // new
 CustomPartitioner(50) -- none of the way is working here.

 while reading the file using textFile method it automatically partitions
 the file. However when i explicitly want to partition the new rdd
 locLines, It doesn't appear to do anything and even the number of
 partitions are same which is created by sc.textFile().

 Any help in this regard will be highly appreciated.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Regards,
Muhammad Aamir


*CONFIDENTIALITY:This email is intended solely for the person(s) named and
may be confidential and/or privileged.If you are not the intended
recipient,please delete it,notify me and do not copy,use,or disclose its
content.*


Re: implicits is not a member of org.apache.spark.sql.SQLContext

2015-04-21 Thread Ted Yu
Have you tried the following ?
  import sqlContext._
  import sqlContext.implicits._

Cheers

On Tue, Apr 21, 2015 at 7:54 AM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  I tried to convert an RDD to a data frame using the example codes on
 spark website





 case class Person(name: String, age: Int)



 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 import sqlContext.implicits._

 val people  =
 sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p
 = *Person*(p(0), p(1).trim.toInt)).toDF()



 It compile fine using sbt. But in IntelliJ IDEA 14.0.3, it fail to compile
 and generate the following error



 Error:(289, 23) value *implicits is not a member of
 org.apache.spark.sql.SQLContext*

 import sqlContext.implicits._

   ^



 What is the problem here? Is there any work around (e.g. do explicit
 conversion)?



 Here is my build.sbt



 name := my-project



 version := 0.2



 scalaVersion := 2.10.4



 val sparkVersion = 1.3.1



 val luceneVersion = 4.10.2



 libraryDependencies = scalaVersion {

   scala_version = Seq(

 org.apache.spark %% spark-core % sparkVersion  % provided,

 org.apache.spark %% spark-mllib % sparkVersion % provided,

 spark.jobserver % job-server-api % 0.4.1 % provided,

 org.scalatest %% scalatest % 2.2.1 % test

   )

 }

 resolvers += Spark Packages Repo at 
 http://dl.bintray.com/spark-packages/maven;

 resolvers += Resolver.mavenLocal





 Ningjun





implicits is not a member of org.apache.spark.sql.SQLContext

2015-04-21 Thread Wang, Ningjun (LNG-NPV)
I tried to convert an RDD to a data frame using the example codes on spark 
website



case class Person(name: String, age: Int)


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val people  = 
sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p 
= Person(p(0), p(1).trim.toInt)).toDF()

It compile fine using sbt. But in IntelliJ IDEA 14.0.3, it fail to compile and 
generate the following error

Error:(289, 23) value implicits is not a member of 
org.apache.spark.sql.SQLContext
import sqlContext.implicits._
  ^

What is the problem here? Is there any work around (e.g. do explicit 
conversion)?

Here is my build.sbt

name := my-project

version := 0.2

scalaVersion := 2.10.4

val sparkVersion = 1.3.1

val luceneVersion = 4.10.2

libraryDependencies = scalaVersion {
  scala_version = Seq(
org.apache.spark %% spark-core % sparkVersion  % provided,
org.apache.spark %% spark-mllib % sparkVersion % provided,
spark.jobserver % job-server-api % 0.4.1 % provided,
org.scalatest %% scalatest % 2.2.1 % test
  )
}
resolvers += Spark Packages Repo at 
http://dl.bintray.com/spark-packages/maven;
resolvers += Resolver.mavenLocal


Ningjun



Re: Join on DataFrames from the same source (Pyspark)

2015-04-21 Thread Karlson

Sorry, my code actually was

df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')

But in Spark 1.4.0 this does not seem to make any difference anyway and 
the problem is the same with both versions.



On 2015-04-21 17:04, ayan guha wrote:

your code should be

 df_one = df.select('col1', 'col2')
 df_two = df.select('col1', 'col3')

Your current code is generating a tupple, and of course df_1 and df_2 
are

different, so join is yielding to cartesian.

Best
Ayan

On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote:


Hi,

can anyone confirm (and if so elaborate on) the following problem?

When I join two DataFrames that originate from the same source 
DataFrame,
the resulting DF will explode to a huge number of rows. A quick 
example:


I load a DataFrame with n rows from disk:

df = sql_context.parquetFile('data.parquet')

Then I create two DataFrames from that source.

df_one = df.select(['col1', 'col2'])
df_two = df.select(['col1', 'col3'])

Finally I want to (inner) join them back together:

df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'],
'inner')

The key in col1 is unique. The resulting DataFrame should have n rows,
however it does have n*n rows.

That does not happen, when I load df_one and df_two from disk 
directly. I
am on Spark 1.3.0, but this also happens on the current 1.4.0 
snapshot.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Clustering algorithms in Spark

2015-04-21 Thread Jeetendra Gangele
The problem with k means is we have to define the no of cluster which I
dont want in this case
So thinking for something like hierarchical clustering any idea and
suggestions?



On 21 April 2015 at 20:51, Jeetendra Gangele gangele...@gmail.com wrote:

 I have a requirement in which I want to match the company name .. and I am
 thinking to solve this using clustering technique.

 Can anybody suggest which algo I should Use in Spark and how to evaluate
 the running time and accuracy for this particular problem.

 I checked K means looks good.
 Any idea suggestions?




Re: Join on DataFrames from the same source (Pyspark)

2015-04-21 Thread ayan guha
your code should be

 df_one = df.select('col1', 'col2')
 df_two = df.select('col1', 'col3')

Your current code is generating a tupple, and of course df_1 and df_2 are
different, so join is yielding to cartesian.

Best
Ayan

On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote:

 Hi,

 can anyone confirm (and if so elaborate on) the following problem?

 When I join two DataFrames that originate from the same source DataFrame,
 the resulting DF will explode to a huge number of rows. A quick example:

 I load a DataFrame with n rows from disk:

 df = sql_context.parquetFile('data.parquet')

 Then I create two DataFrames from that source.

 df_one = df.select(['col1', 'col2'])
 df_two = df.select(['col1', 'col3'])

 Finally I want to (inner) join them back together:

 df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'],
 'inner')

 The key in col1 is unique. The resulting DataFrame should have n rows,
 however it does have n*n rows.

 That does not happen, when I load df_one and df_two from disk directly. I
 am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Best Regards,
Ayan Guha


Re: Custom Partitioning Spark

2015-04-21 Thread ayan guha
Are you looking for?

*mapPartitions*(*func*)Similar to map, but runs separately on each
partition (block) of the RDD, so *func* must be of type IteratorT =
IteratorU when running on an RDD of type T.*mapPartitionsWithIndex*(*func*
)Similar to mapPartitions, but also provides *func* with an integer value
representing the index of the partition, so *func* must be of type (Int,
IteratorT) = IteratorU when running on an RDD of type T.

On Wed, Apr 22, 2015 at 1:00 AM, MUHAMMAD AAMIR mas.ha...@gmail.com wrote:

 Hi Archit,

 Thanks a lot for your reply. I am using rdd.partitions.length to check
 the number of partitions. rdd.partitions return the array of partitions.
 I would like to add one more question here do you have any idea how to get
 the objects in each partition ? Further is there any way to figure out
 which particular partitions an object bleongs ?

 Thanks,

 On Tue, Apr 21, 2015 at 12:16 PM, Archit Thakur archit279tha...@gmail.com
  wrote:

 Hi,

 This should work. How are you checking the no. of partitions.?

 Thanks and Regards,
 Archit Thakur.

 On Mon, Apr 20, 2015 at 7:26 PM, mas mas.ha...@gmail.com wrote:

 Hi,

 I aim to do custom partitioning on a text file. I first convert it into
 pairRDD and then try to use my custom partitioner. However, somehow it is
 not working. My code snippet is given below.

 val file=sc.textFile(filePath)
 val locLines=file.map(line = line.split(\t)).map(line=
 ((line(2).toDouble,line(3).toDouble),line(5).toLong))
 val ck=locLines.partitionBy(new HashPartitioner(50)) // new
 CustomPartitioner(50) -- none of the way is working here.

 while reading the file using textFile method it automatically
 partitions
 the file. However when i explicitly want to partition the new rdd
 locLines, It doesn't appear to do anything and even the number of
 partitions are same which is created by sc.textFile().

 Any help in this regard will be highly appreciated.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Regards,
 Muhammad Aamir


 *CONFIDENTIALITY:This email is intended solely for the person(s) named and
 may be confidential and/or privileged.If you are not the intended
 recipient,please delete it,notify me and do not copy,use,or disclose its
 content.*




-- 
Best Regards,
Ayan Guha


Re: Meet Exception when learning Broadcast Variables

2015-04-21 Thread Ted Yu
Does line 27 correspond to brdcst.value ?

Cheers



 On Apr 21, 2015, at 3:19 AM, donhoff_h 165612...@qq.com wrote:
 
 Hi, experts.
 
 I wrote a very little program to learn how to use Broadcast Variables, but 
 met an exception. The program and the exception are listed as following.  
 Could anyone help me to solve this problem? Thanks!
 
 **My Program is as following**
 object TestBroadcast02 {
  var brdcst : Broadcast[Array[Int]] = null
 
  def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
brdcst = sc.broadcast(Array(1,2,3,4,5,6))
val rdd = 
 sc.textFile(hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt)
rdd.foreachPartition(fun1)
sc.stop()
  }
 
  def fun1(it : Iterator[String]) : Unit = {
val v = brdcst.value
for(i - v) println(BroadCast Variable:+i)
for(j - it) println(Text File Content:+j)
  }
 } 
 **The Exception is as following**
 15/04/21 17:39:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 
 (TID 0, bgdt01.dev.hrb): java.lang.NullPointerException
 at 
 dhao.test.BroadCast.TestBroadcast02$.fun1(TestBroadcast02.scala:27)
 at 
 dhao.test.BroadCast.TestBroadcast02$$anonfun$main$1.apply(TestBroadcast02.scala:22)
 at 
 dhao.test.BroadCast.TestBroadcast02$$anonfun$main$1.apply(TestBroadcast02.scala:22)
 at 
 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
 at 
 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
 at 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
 at 
 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
 at 
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 
 By the way, if I use anonymous function instead of 'fun1' in my program, it 
 works. But since I think the readability is not good for anonymous functions, 
 I still prefer to use the 'fun1' .

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problem with using Spark ML

2015-04-21 Thread Staffan
Hi,
I've written an application that performs some machine learning on some
data. I've validated that the data _should_ give a good output with a decent
RMSE by using Lib-SVM:
Mean squared error = 0.00922063 (regression)
Squared correlation coefficient = 0.9987 (regression)

When I try to use Spark ML to do the exact same thing I get:
Mean Squared Error = 8.466193152067944E224

Which is somewhat worse.. I've tried to look at the data before it's
inputted to the model, printed that data to file (which is actually the data
used when I got the result from Lib-SVM above). Somewhere there much be a
huge mistake, but I cannot place it somewhere in my code (see below).
traningLP and testLP are training and test-data, in RDD[LabeledPoint].  

// Generate model
val model_gen = new RidgeRegressionWithSGD();
val model = model_gen.run(trainingLP);

// Predict on the test-data
val valuesAndPreds = testLP.map { point =
val prediction = model.predict(point.features);
println(label:  + point.label + , pred:  + prediction);
(point.label, prediction);
}
val MSE = valuesAndPreds.map{case(v, p) = math.pow((v - p), 2)}.mean();
println(Mean Squared Error =  + MSE) 


I've printed label and prediction-values for each data-point in the testset,
and the result is something like this;
label: 5.04, pred: -4.607899000641277E112
label: 3.59, pred: -3.96787105480399E112
label: 5.06, pred: -2.8263294374576145E112
label: 2.85, pred: -1.1536508029072844E112
label: 2.1, pred: -4.269312783707508E111
label: 2.75, pred: -3.0072665148591558E112
label: -0.29, pred: -2.035681731641989E112
label: 1.98, pred: -3.163404340354783E112

So there is obviously something wrong with the prediction step. I'm using
the SparseVector representation of the Vector in LabeledPoint, looking
something like this for reference (shortened for convenience);
(-1.59,(2080,[29,59,62,74,127,128,131,144,149,175,198,200,239,247,267,293,307,364,374,393,410,424,425,431,448,469,477,485,501,525,532,533,538,560,..],[1.0,1.0,2.0,8.0,1.0,1.0,6.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,8.0,2.0,1.0,1.0,..]))
(-1.75,(2080,[103,131,149,208,296,335,520,534,603,620,661,694,709,748,859,1053,1116,1156,1186,1207,1208,1223,1256,1278,1356,1375,1399,1480,1569,..],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,4.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,2.0,2.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,4.0,1.0,7.0,1.0,3.0,2.0,1.0]))

I do get one type of warning, but that's about it! (And as to my
understanding, this native code is not required to get the correct results,
only to improve performance). 
6010 [main] WARN  com.github.fommil.netlib.BLAS  - Failed to load
implementation from: com.github.fommil.netlib.NativeSystemBLAS
6011 [main] WARN  com.github.fommil.netlib.BLAS  - Failed to load
implementation from: com.github.fommil.netlib.NativeRefBLAS

So where do I go from here? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-using-Spark-ML-tp22591.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-21 Thread Sourav Chandra
Hi,

We are building a spark streaming application which reads from kafka, does
updateStateBykey based on the received message type and finally stores into
redis.

After running for few seconds the executor process get killed by throwing
OutOfMemory error.

The code snippet is below:


*NoOfReceiverInstances = 1*

*val kafkaStreams = (1 to NoOfReceiverInstances).map(*
*  _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
*)*
*val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
Long)]) = {...}*

*ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*



*object RedisHelper {*
*  private val client = scredis.Redis(*
*
ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
*  )*

*  def update(**itr: Iterator[(String, (Long, Long))]) {*
*// redis save operation*
*  }*

*}*


*Below is the spark configuration:*


*spark.app.name http://spark.app.name = XXX*
*spark.jars = .jar*
*spark.home = /spark-1.1.1-bin-hadoop2.4*
*spark.executor.memory = 1g*
*spark.streaming.concurrentJobs = 1000*
*spark.logConf = true*
*spark.cleaner.ttl = 3600 //in milliseconds*
*spark.default.parallelism = 12*
*spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails
-XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
-XX:+HeapDumpOnOutOfMemoryError*
*spark.executor.logs.rolling.strategy = size*
*spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
*spark.executor.logs.rolling.maxRetainedFiles = 10*
*spark.serializer = org.apache.spark.serializer.KryoSerializer*
*spark.kryo.registrator = xxx.NoOpKryoRegistrator*


other configurations are below

*streaming {*
*// All streaming context related configs should come here*
*batch-duration = 1 second*
*checkpoint-directory = /tmp*
*checkpoint-duration = 10 seconds*
*slide-duration = 1 second*
*window-duration = 1 second*
*partitions-for-shuffle-task = 32*
*  }*
*  kafka {*
*no-of-receivers = 1*
*zookeeper-quorum = :2181*
*consumer-group = x*
*topic = x:2*
*  }*

We tried different combinations like
 - with spark 1.1.0 and 1.1.1.
 - by increasing executor memory
 - by changing the serialization strategy (switching between kryo and
normal java)
 - by changing broadcast strategy (switching between http and torrent
broadcast)


Can anyone give any insight what we are missing here? How can we fix this?

Due to akka version mismatch with some other libraries we cannot upgrade
the spark version.

Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Column renaming after DataFrame.groupBy

2015-04-21 Thread ayan guha
Hi

There are 2 ways of doing it.

1. Using SQL - this method directly creates another dataframe object.
2. Using methods of the DF object, but in that case you have to provide the
schema through a row object. In this case you need to explicitly call
createDataFrame again which will infer the schema for you.

Here is python code
Method 1:
userStat = ssc.sql(select userId,sum(rating) total  from ratings group
by userId)
print userStat.collect()[10]
userStat.printSchema()

Method 2:
userStatDF = userStat.groupBy(userId).sum().map(lambda t:
Row(userId=t[0],total=t[1]))
userStatDFSchema = ssc.createDataFrame(userStatDF)
print type(userStatDFSchema)
print userStatDFSchema.printSchema()

Output:
Row(userId=233, total=478)
root
 |-- userId: long (nullable = true)
 |-- total: long (nullable = true)

class 'pyspark.sql.dataframe.DataFrame'
root
 |-- total: long (nullable = true)
 |-- userId: long (nullable = true)

As you can see, the downside of Method 2 is order of the fields are now
inferred (and most likely created in a dict under the hood) so ordered
alphabetically.

Hope this helps

Best
Ayan

On Tue, Apr 21, 2015 at 6:06 PM, Justin Yip yipjus...@prediction.io wrote:

 Hello,

 I would like rename a column after aggregation. In the following code, the
 column name is SUM(_1#179), is there a way to rename it to a more
 friendly name?

 scala val d = sqlContext.createDataFrame(Seq((1, 2), (1, 3), (2, 10)))
 scala d.groupBy(_1).sum().printSchema
 root
  |-- _1: integer (nullable = false)
  |-- SUM(_1#179): long (nullable = true)
  |-- SUM(_2#180): long (nullable = true)

 Thanks.

 Justin

 --
 View this message in context: Column renaming after DataFrame.groupBy
 http://apache-spark-user-list.1001560.n3.nabble.com/Column-renaming-after-DataFrame-groupBy-tp22586.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.




-- 
Best Regards,
Ayan Guha


Re: Custom Partitioning Spark

2015-04-21 Thread Archit Thakur
Hi,

This should work. How are you checking the no. of partitions.?

Thanks and Regards,
Archit Thakur.

On Mon, Apr 20, 2015 at 7:26 PM, mas mas.ha...@gmail.com wrote:

 Hi,

 I aim to do custom partitioning on a text file. I first convert it into
 pairRDD and then try to use my custom partitioner. However, somehow it is
 not working. My code snippet is given below.

 val file=sc.textFile(filePath)
 val locLines=file.map(line = line.split(\t)).map(line=
 ((line(2).toDouble,line(3).toDouble),line(5).toLong))
 val ck=locLines.partitionBy(new HashPartitioner(50)) // new
 CustomPartitioner(50) -- none of the way is working here.

 while reading the file using textFile method it automatically partitions
 the file. However when i explicitly want to partition the new rdd
 locLines, It doesn't appear to do anything and even the number of
 partitions are same which is created by sc.textFile().

 Any help in this regard will be highly appreciated.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark 1.3.1 Dataframe breaking ALS.train?

2015-04-21 Thread ayan guha
Hi

I am getting an error

Also, I am getting an error in mlib.ALS.train function when passing
dataframe (do I need to convert the DF to RDD?)

Code:
training = ssc.sql(select userId,movieId,rating from ratings where
partitionKey  6).cache()
print type(training)
model = ALS.train(training,rank,numIter,lmbda)

Error:
class 'pyspark.sql.dataframe.DataFrame'

Traceback (most recent call last):
  File D:\Project\Spark\code\movie_sql.py, line 109, in module
bestConf = getBestModel(sc,ssc,training,validation,validationNoRating)
  File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
model = ALS.train(trainingRDD,rank,numIter,lmbda)
  File
D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
line 139, in train
model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
iterations,
  File
D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
line 127, in _prepare
assert isinstance(ratings, RDD), ratings should be RDD
AssertionError: ratings should be RDD

It was working fine in 1.2.0 (till last night :))

Any solution? I am thinking to map the training dataframe back to a RDD,
byt will lose the schema information.

Best
Ayan

On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com wrote:

 Hi
 Just upgraded to Spark 1.3.1.

 I am getting an warning

 Warning (from warnings module):
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py,
 line 191
 warnings.warn(inferSchema is deprecated, please use createDataFrame
 instead)
 UserWarning: inferSchema is deprecated, please use createDataFrame instead

 However, documentation still says to use inferSchema.
 Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
 section

 Also, I am getting an error in mlib.ALS.train function when passing
 dataframe (do I need to convert the DF to RDD?)

 Code:
 training = ssc.sql(select userId,movieId,rating from ratings where
 partitionKey  6).cache()
 print type(training)
 model = ALS.train(training,rank,numIter,lmbda)

 Error:
 class 'pyspark.sql.dataframe.DataFrame'
 Rank:8 Lmbda:1.0 iteration:10

 Traceback (most recent call last):
   File D:\Project\Spark\code\movie_sql.py, line 109, in module
 bestConf = getBestModel(sc,ssc,training,validation,validationNoRating)
   File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
 model = ALS.train(trainingRDD,rank,numIter,lmbda)
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 139, in train
 model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
 iterations,
   File
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
 line 127, in _prepare
 assert isinstance(ratings, RDD), ratings should be RDD
 AssertionError: ratings should be RDD

 --
 Best Regards,
 Ayan Guha




-- 
Best Regards,
Ayan Guha


RE:RE:maven compile error

2015-04-21 Thread Shuai Zheng
I have similar issue (I failed on the spark core project but with same 
exception as you). Then I follow the below steps (I am working on windows):

 

Delete the maven repository, and re-download all dependency. The issue sounds 
like a corrupted jar can’t be opened by maven.

 

Other than this, I also did below steps (I don’t think it is the solution, but 
I just describe my steps):

1, Uninstall scala 2.11 version (I have one install there). Then I only have 
2.10.5 on my pc

2, Upgrade maven to latest 3.3.1 

3, Install latest git client 

 

Regards,

 

Shuai

 

From: myelinji [mailto:myeli...@aliyun.com] 
Sent: Friday, April 03, 2015 6:58 AM
To: Sean Owen
Cc: spark用户组
Subject: 答复:maven compile error

 

Thank you for your reply. When I'm using maven to compile the whole project, 
the erros as follows

 

[INFO] Spark Project Parent POM .. SUCCESS [4.136s]
[INFO] Spark Project Networking .. SUCCESS [7.405s]
[INFO] Spark Project Shuffle Streaming Service ... SUCCESS [5.071s]
[INFO] Spark Project Core  SUCCESS [3:08.445s]
[INFO] Spark Project Bagel ... SUCCESS [21.613s]
[INFO] Spark Project GraphX .. SUCCESS [58.915s]
[INFO] Spark Project Streaming ... SUCCESS [1:26.202s]
[INFO] Spark Project Catalyst  FAILURE [1.537s]
[INFO] Spark Project SQL . SKIPPED
[INFO] Spark Project ML Library .. SKIPPED
[INFO] Spark Project Tools ... SKIPPED
[INFO] Spark Project Hive  SKIPPED
[INFO] Spark Project REPL  SKIPPED
[INFO] Spark Project Assembly  SKIPPED
[INFO] Spark Project External Twitter  SKIPPED
[INFO] Spark Project External Flume Sink . SKIPPED
[INFO] Spark Project External Flume .. SKIPPED
[INFO] Spark Project External MQTT ... SKIPPED
[INFO] Spark Project External ZeroMQ . SKIPPED
[INFO] Spark Project External Kafka .. SKIPPED
[INFO] Spark Project Examples  SKIPPED

 

it seems like there is something wrong with calatlyst project. Why i cannot 
compile this project?

 

 

--

发件人:Sean Owen so...@cloudera.com

发送时间:2015年4月3日(星期五) 17:48

收件人:myelinji myeli...@aliyun.com

抄 送:spark用户组 user@spark.apache.org

主 题:Re: maven compile error

 

If you're asking about a compile error, you should include the command
you used to compile.

I am able to compile branch 1.2 successfully with mvn -DskipTests
clean package.

This error is actually an error from scalac, not a compile error from
the code. It sort of sounds like it has not been able to download
scala dependencies. Check or maybe recreate your environment.

On Fri, Apr 3, 2015 at 3:19 AM, myelinji myeli...@aliyun.com wrote:
 Hi,all:
 Just now i checked out spark-1.2 on github , wanna to build it use maven,
 how ever I encountered an error during compiling:

 [INFO]
 
 [ERROR] Failed to execute goal
 net.alchim31.maven:scala-maven-plugin:3.2.0:compile (scala-compile-first) on
 project spark-catalyst_2.10: wrap:
 scala.reflect.internal.MissingRequirementError: object scala.runtime in
 compiler mirror not found. - [Help 1]
 org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
 goal net.alchim31.maven:scala-maven-plugin:3.2.0:compile
 (scala-compile-first) on project spark-catalyst_2.10: wrap:
 scala.reflect.internal.MissingRequirementError: object scala.runtime in
 compiler mirror not found.
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:217)
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
 at
 org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
 at
 org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84)
 at
 org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59)
 at
 org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183)
 at
 org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161)
 at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320)
 at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156)
 at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537)
 at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196)
 at org.apache.maven.cli.MavenCli.main(MavenCli.java:141)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

Re: Understanding the build params for spark with sbt.

2015-04-21 Thread Sree V
Hi Shiyao,
From the same page you referred:Maven is the official recommendation for 
packaging Spark, and is the “build of reference”. But SBT is supported for 
day-to-day development since it can provide much faster iterative compilation. 
More advanced developers may wish to use SBT.

For maven, pom.xml is the main and important file.
-P stands for Profilesearch for 'profile' in spark/pom.xmlMore on it: 
http://maven.apache.org/guides/introduction/introduction-to-profiles.html
-D stands for Definemaven takes it from Java or earlier languages.It is a way 
to pass system.properties and/or override existing properties from build file.
Core build:spark/core/pom.xml is your build file for building only Spark-Core.

Thanking you.

With Regards
Sree 


 On Tuesday, April 21, 2015 12:12 AM, Akhil Das 
ak...@sigmoidanalytics.com wrote:
   

 With maven you could like:

mvn -Dhadoop.version=2.3.0 -DskipTests clean package -pl core

ThanksBest Regards
On Mon, Apr 20, 2015 at 8:10 PM, Shiyao Ma i...@introo.me wrote:

Hi.

My usage is only about the spark core and hdfs, so no spark sql or
mlib or other components invovled.


I saw the hint on the
http://spark.apache.org/docs/latest/building-spark.html, with a sample
like:
build/sbt -Pyarn -Phadoop-2.3 assembly. (what's the -P for?)


Fundamentally, I'd like to let sbt only compile and package the core
and the hadoop.

Meanwhile, it would be appreciated if you could inform me what's the
scala file that controls the logic of -Pyarn, so that I can dig into
the build source and have a finer control.



Thanks.

--

吾輩は猫である。ホームーページはhttp://introo.me。

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





  

Re: HiveContext setConf seems not stable

2015-04-21 Thread Michael Armbrust
As a workaround, can you call getConf first before any setConf?

On Tue, Apr 21, 2015 at 1:58 AM, Ophir Cohen oph...@gmail.com wrote:

 I think I encounter the same problem, I'm trying to turn on the
 compression of Hive.
 I have the following lines:
 def initHiveContext(sc: SparkContext): HiveContext = {
 val hc: HiveContext = new HiveContext(sc)
 hc.setConf(hive.exec.compress.output, true)
 hc.setConf(mapreduce.output.fileoutputformat.compress.codec,
 org.apache.hadoop.io.compress.SnappyCodec)
 hc.setConf(mapreduce.output.fileoutputformat.compress.type, BLOCK)


 logger.info(hc.getConf(hive.exec.compress.output))
 logger.info
 (hc.getConf(mapreduce.output.fileoutputformat.compress.codec))
 logger.info
 (hc.getConf(mapreduce.output.fileoutputformat.compress.type))

 hc
   }
 And the log for calling it twice:
 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: false
 15/04/21 08:37:39 INFO util.SchemaRDDUtils$:
 org.apache.hadoop.io.compress.SnappyCodec
 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK
 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: true
 15/04/21 08:37:39 INFO util.SchemaRDDUtils$:
 org.apache.hadoop.io.compress.SnappyCodec
 15/04/21 08:37:39 INFO util.SchemaRDDUtils$: BLOCK

 BTW
 It worked on 1.2.1...


 On Thu, Apr 2, 2015 at 11:47 AM, Hao Ren inv...@gmail.com wrote:

 Hi,

 Jira created: https://issues.apache.org/jira/browse/SPARK-6675

 Thank you.


 On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com
 wrote:

 Can you open a JIRA please?

 On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren inv...@gmail.com wrote:

 Hi,

 I find HiveContext.setConf does not work correctly. Here are some code
 snippets showing the problem:

 snippet 1:

 
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkConf, SparkContext}

 object Main extends App {

   val conf = new SparkConf()
 .setAppName(context-test)
 .setMaster(local[8])
   val sc = new SparkContext(conf)
   val hc = new HiveContext(sc)

   *hc.setConf(spark.sql.shuffle.partitions, 10)*
 *  hc.setConf(hive.metastore.warehouse.dir,
 /home/spark/hive/warehouse_test)*
   hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
   hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach
 println
 }

 

 *Results:*
 (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test)
 (spark.sql.shuffle.partitions,10)

 snippet 2:

 
 ...
   *hc.setConf(hive.metastore.warehouse.dir,
 /home/spark/hive/warehouse_test)*
 *  hc.setConf(spark.sql.shuffle.partitions, 10)*
   hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
   hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach
 println
 ...

 

 *Results:*
 (hive.metastore.warehouse.dir,/user/hive/warehouse)
 (spark.sql.shuffle.partitions,10)

 *You can see that I just permuted the two setConf call, then that leads
 to two different Hive configuration.*
 *It seems that HiveContext can not set a new value on
 hive.metastore.warehouse.dir key in one or the first setConf call.*
 *You need another setConf call before changing
 hive.metastore.warehouse.dir. For example, set
 hive.metastore.warehouse.dir twice and the snippet 1*

 snippet 3:

 
 ...
 *  hc.setConf(hive.metastore.warehouse.dir,
 /home/spark/hive/warehouse_test)*
 *  hc.setConf(hive.metastore.warehouse.dir,
 /home/spark/hive/warehouse_test)*
   hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
 ...

 

 *Results:*
 (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test)


 *You can reproduce this if you move to the latest branch-1.3
 (1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)*

 *I have also tested the released 1.3.0 (htag =
 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.*

 *Please tell me if I am missing something. Any help is highly
 appreciated.*

 Hao

 --
 Hao Ren

 {Data, Software} Engineer @ ClaraVista

 Paris, France





 --
 Hao Ren

 {Data, Software} Engineer @ ClaraVista

 Paris, France





Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-21 Thread Jean-Pascal Billaud
At this point I am assuming that nobody has an idea... I am still going to
give it a last shot just in case it was missed by some people :)

Thanks,

On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 Hey, so I start the context at the very end when all the piping is done.
 BTW a foreachRDD will be called on the resulting dstream.map() right after
 that.

 The puzzling thing is why removing the context bounds solve the problem...
 What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hi,

 I am getting this serialization exception and I am not too sure what
 Graph is unexpectedly null when DStream is being serialized means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: FAILED,
 exitCode: 15, (reason: User class threw exception: Task not serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task not
 serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
 ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly null
 when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:
 985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String,
 key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g. removing
 ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,






Re: SparkSQL performance

2015-04-21 Thread Renato Marroquín Mogrovejo
Thanks Michael!
I have tried applying my schema programatically but I didn't get any
improvement on performance :(
Could you point me to some code examples using Avro please?
Many thanks again!


Renato M.

2015-04-21 20:45 GMT+02:00 Michael Armbrust mich...@databricks.com:

 Here is an example using rows directly:

 https://spark.apache.org/docs/1.3.0/sql-programming-guide.html#programmatically-specifying-the-schema

 Avro or parquet input would likely give you the best performance.

 On Tue, Apr 21, 2015 at 4:28 AM, Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com wrote:

 Thanks for the hints guys! much appreciated!
 Even if I just do a something like:

 Select * from tableX where attribute1  5

 I see similar behaviour.

 @Michael
 Could you point me to any sample code that uses Spark's Rows? We are at a
 phase where we can actually change our JavaBeans for something that
 provides a better performance than what we are seeing now. Would you
 recommend using Avro presentation then?
 Thanks again!


 Renato M.

 2015-04-21 1:18 GMT+02:00 Michael Armbrust mich...@databricks.com:

 There is a cost to converting from JavaBeans to Rows and this code path
 has not been optimized.  That is likely what you are seeing.

 On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote:

 SparkSQL optimizes better by column pruning and predicate pushdown,
 primarily. Here you are not taking advantage of either.

 I am curious to know what goes in your filter function, as you are not
 using a filter in SQL side.

 Best
 Ayan
 On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com wrote:

 Does anybody have an idea? a clue? a hint?
 Thanks!


 Renato M.

 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com:

 Hi all,

 I have a simple query Select * from tableX where attribute1 between
 0 and 5 that I run over a Kryo file with four partitions that ends up
 being around 3.5 million rows in our case.
 If I run this query by doing a simple map().filter() it takes around
 ~9.6 seconds but when I apply schema, register the table into a 
 SqlContext,
 and then run the query, it takes around ~16 seconds. This is using Spark
 1.2.1 with Scala 2.10.0
 I am wondering why there is such a big gap on performance if it is
 just a filter. Internally, the relation files are mapped to a JavaBean.
 This different data presentation (JavaBeans vs SparkSQL internal
 representation) could lead to such difference? Is there anything I could 
 do
 to make the performance get closer to the hard-coded option?
 Thanks in advance for any suggestions or ideas.


 Renato M.








Re: Features scaling

2015-04-21 Thread DB Tsai
Hi Denys,

I don't see any issue in your python code, so maybe there is a bug in
python wrapper. If it's in scala, I think it should work. BTW,
LogsticRegressionWithLBFGS does the standardization internally, so you
don't need to do it yourself. It worths giving it a try!

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Tue, Apr 21, 2015 at 1:00 AM, Denys Kozyr dko...@gmail.com wrote:
 Hi!

 I want to normalize features before train logistic regression. I setup scaler:

 scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)

 and apply it to a dataset:

 scaledData = dataset.map(lambda x: LabeledPoint(x.label,
 scaler2.transform(Vectors.dense(x.features.toArray() 

 but I can't work with scaledData (can't output it or train regression
 on it), got an error:

 Exception: It appears that you are attempting to reference SparkContext from 
 a b
 roadcast variable, action, or transforamtion. SparkContext can only be used 
 on t
 he driver, not in code that it run on workers. For more information, see 
 SPARK-5
 063.

 Does it correct code to make normalization? Why it doesn't work?
 Any advices are welcome.
 Thanks.

 Full code:
 https://gist.github.com/dkozyr/d31551a3ebed0ee17772

 Console output:
 https://gist.github.com/dkozyr/199f0d4f44cf522f9453

 Denys

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Performance on Yarn

2015-04-21 Thread hnahak
Try --executor-memory 5g , because you have 8 gb RAM in each machine 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p22603.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming updatyeStateByKey throws OutOfMemory Error

2015-04-21 Thread Olivier Girardot
Hi Sourav,
Can you post your updateFunc as well please ?

Regards,

Olivier.

Le mar. 21 avr. 2015 à 12:48, Sourav Chandra sourav.chan...@livestream.com
a écrit :

 Hi,

 We are building a spark streaming application which reads from kafka, does
 updateStateBykey based on the received message type and finally stores into
 redis.

 After running for few seconds the executor process get killed by throwing
 OutOfMemory error.

 The code snippet is below:


 *NoOfReceiverInstances = 1*

 *val kafkaStreams = (1 to NoOfReceiverInstances).map(*
 *  _ = KafkaUtils.createStream(ssc, ZKQuorum, ConsumerGroup, TopicsMap)*
 *)*
 *val updateFunc = (values: Seq[IConcurrentUsers], state: Option[(Long,
 Long)]) = {...}*


 *ssc.union(kafkaStreams).map(KafkaMessageMapper(_)).filter(...)..updateStateByKey(updateFunc).foreachRDD(_.foreachPartition(RedisHelper.update(_)))*



 *object RedisHelper {*
 *  private val client = scredis.Redis(*
 *
 ConfigFactory.parseProperties(System.getProperties).getConfig(namespace)*
 *  )*

 *  def update(**itr: Iterator[(String, (Long, Long))]) {*
 *// redis save operation*
 *  }*

 *}*


 *Below is the spark configuration:*


 *spark.app.name http://spark.app.name = XXX*
 *spark.jars = .jar*
 *spark.home = /spark-1.1.1-bin-hadoop2.4*
 *spark.executor.memory = 1g*
 *spark.streaming.concurrentJobs = 1000*
 *spark.logConf = true*
 *spark.cleaner.ttl = 3600 //in milliseconds*
 *spark.default.parallelism = 12*
 *spark.executor.extraJavaOptions = -Xloggc:gc.log -XX:+PrintGCDetails
 -XX:+UseConcMarkSweepGC -XX:HeapDumpPath=1.hprof
 -XX:+HeapDumpOnOutOfMemoryError*
 *spark.executor.logs.rolling.strategy = size*
 *spark.executor.logs.rolling.size.maxBytes = 104857600 // 100 MB*
 *spark.executor.logs.rolling.maxRetainedFiles = 10*
 *spark.serializer = org.apache.spark.serializer.KryoSerializer*
 *spark.kryo.registrator = xxx.NoOpKryoRegistrator*


 other configurations are below

 *streaming {*
 *// All streaming context related configs should come here*
 *batch-duration = 1 second*
 *checkpoint-directory = /tmp*
 *checkpoint-duration = 10 seconds*
 *slide-duration = 1 second*
 *window-duration = 1 second*
 *partitions-for-shuffle-task = 32*
 *  }*
 *  kafka {*
 *no-of-receivers = 1*
 *zookeeper-quorum = :2181*
 *consumer-group = x*
 *topic = x:2*
 *  }*

 We tried different combinations like
  - with spark 1.1.0 and 1.1.1.
  - by increasing executor memory
  - by changing the serialization strategy (switching between kryo and
 normal java)
  - by changing broadcast strategy (switching between http and torrent
 broadcast)


 Can anyone give any insight what we are missing here? How can we fix this?

 Due to akka version mismatch with some other libraries we cannot upgrade
 the spark version.

 Thanks,
 --

 Sourav Chandra

 Senior Software Engineer

 · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

 sourav.chan...@livestream.com

 o: +91 80 4121 8723

 m: +91 988 699 3746

 skype: sourav.chandra

 Livestream

 Ajmera Summit, First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
 Block, Koramangala Industrial Area,

 Bangalore 560034

 www.livestream.com



Re: Spark 1.3.1 Dataframe breaking ALS.train?

2015-04-21 Thread ayan guha
Thank you all.
On 22 Apr 2015 04:29, Xiangrui Meng men...@gmail.com wrote:

 SchemaRDD subclasses RDD in 1.2, but DataFrame is no longer an RDD in
 1.3. We should allow DataFrames in ALS.train. I will submit a patch.
 You can use `ALS.train(training.rdd, ...)` for now as a workaround.
 -Xiangrui

 On Tue, Apr 21, 2015 at 10:51 AM, Joseph Bradley jos...@databricks.com
 wrote:
  Hi Ayan,
 
  If you want to use DataFrame, then you should use the Pipelines API
  (org.apache.spark.ml.*) which will take DataFrames:
 
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.recommendation.ALS
 
  In the examples/ directory for ml/, you can find a MovieLensALS example.
 
  Good luck!
  Joseph
 
  On Tue, Apr 21, 2015 at 4:58 AM, ayan guha guha.a...@gmail.com wrote:
 
  Hi
 
  I am getting an error
 
  Also, I am getting an error in mlib.ALS.train function when passing
  dataframe (do I need to convert the DF to RDD?)
 
  Code:
  training = ssc.sql(select userId,movieId,rating from ratings where
  partitionKey  6).cache()
  print type(training)
  model = ALS.train(training,rank,numIter,lmbda)
 
  Error:
  class 'pyspark.sql.dataframe.DataFrame'
 
  Traceback (most recent call last):
File D:\Project\Spark\code\movie_sql.py, line 109, in module
  bestConf =
 getBestModel(sc,ssc,training,validation,validationNoRating)
File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
  model = ALS.train(trainingRDD,rank,numIter,lmbda)
File
 
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 139, in train
  model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
  iterations,
File
 
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 127, in _prepare
  assert isinstance(ratings, RDD), ratings should be RDD
  AssertionError: ratings should be RDD
 
  It was working fine in 1.2.0 (till last night :))
 
  Any solution? I am thinking to map the training dataframe back to a RDD,
  byt will lose the schema information.
 
  Best
  Ayan
 
  On Mon, Apr 20, 2015 at 10:23 PM, ayan guha guha.a...@gmail.com
 wrote:
 
  Hi
  Just upgraded to Spark 1.3.1.
 
  I am getting an warning
 
  Warning (from warnings module):
File
 
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py,
  line 191
  warnings.warn(inferSchema is deprecated, please use
 createDataFrame
  instead)
  UserWarning: inferSchema is deprecated, please use createDataFrame
  instead
 
  However, documentation still says to use inferSchema.
  Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in
  section
 
  Also, I am getting an error in mlib.ALS.train function when passing
  dataframe (do I need to convert the DF to RDD?)
 
  Code:
  training = ssc.sql(select userId,movieId,rating from ratings where
  partitionKey  6).cache()
  print type(training)
  model = ALS.train(training,rank,numIter,lmbda)
 
  Error:
  class 'pyspark.sql.dataframe.DataFrame'
  Rank:8 Lmbda:1.0 iteration:10
 
  Traceback (most recent call last):
File D:\Project\Spark\code\movie_sql.py, line 109, in module
  bestConf =
  getBestModel(sc,ssc,training,validation,validationNoRating)
File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel
  model = ALS.train(trainingRDD,rank,numIter,lmbda)
File
 
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 139, in train
  model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank,
  iterations,
File
 
 D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py,
  line 127, in _prepare
  assert isinstance(ratings, RDD), ratings should be RDD
  AssertionError: ratings should be RDD
 
  --
  Best Regards,
  Ayan Guha
 
 
 
 
  --
  Best Regards,
  Ayan Guha
 
 



Re: how to make a spark cluster ?

2015-04-21 Thread haihar nahak
I did some performance check on socLiveJournal PageRank b/w my local
machine (8 cores, 16 gb ) in local mode and my small cluster (4 nodes, 12
cores, 40 gb) and i found cluster mode is way faster than local mode. So I
confused.
no. of iterations --- Local mode(in mins) -- cluster mode(in mins)
1
 20 1 231.3 1.2 3   39.5 1.3 556.4 1.6 10   117.26 2.6
with the help of this , I think , might be installing spark cluster on the
same machine and instead of giving local[no. of cores] , I'll set to
spark://host:7070.

Please let me know If I wrong somewhere.


On Tue, Apr 21, 2015 at 6:27 PM, Reynold Xin r...@databricks.com wrote:

 Actually if you only have one machine, just use the Spark local mode.

 Just download the Spark tarball, untar it, set master to local[N], where N
 = number of cores. You are good to go. There is no setup of job tracker or
 Hadoop.


 On Mon, Apr 20, 2015 at 3:21 PM, haihar nahak harihar1...@gmail.com
 wrote:

 Thank you :)

 On Mon, Apr 20, 2015 at 4:46 PM, Jörn Franke jornfra...@gmail.com
 wrote:

 Hi, If you have just one physical machine then I would try out Docker
 instead of a full VM (would be waste of memory and CPU).

 Best regards
 Le 20 avr. 2015 00:11, hnahak harihar1...@gmail.com a écrit :

 Hi All,

 I've big physical machine with 16 CPUs , 256 GB RAM, 20 TB Hard disk. I
 just
 need to know what should be the best solution to make a spark cluster?

 If I need to process TBs of data then
 1. Only one machine, which contain driver, executor, job tracker and
 task
 tracker everything.
 2. create 4 VMs and each VM should consist 4 CPUs , 64 GB RAM
 3. create 8 VMs and each VM should consist 2 CPUs , 32 GB RAM each

 please give me your views/suggestions



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-spark-cluster-tp22563.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 {{{H2N}}}-(@:





-- 
{{{H2N}}}-(@:


Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-21 Thread Jean-Pascal Billaud
Sure. But in general, I am assuming this Graph is unexpectedly null when
DStream is being serialized must mean something. Under which
circumstances, such an exception would trigger?

On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com wrote:

 Yeah, I am not sure what is going on. The only way to figure to take a
 look at the disassembled bytecodes using javap.

 TD

 On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 At this point I am assuming that nobody has an idea... I am still going
 to give it a last shot just in case it was missed by some people :)

 Thanks,

 On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hey, so I start the context at the very end when all the piping is done.
 BTW a foreachRDD will be called on the resulting dstream.map() right after
 that.

 The puzzling thing is why removing the context bounds solve the
 problem... What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com
  wrote:

 Hi,

 I am getting this serialization exception and I am not too sure what
 Graph is unexpectedly null when DStream is being serialized means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception: Task not
 serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task
 not serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
 ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.
 clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly
 null when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:
 985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String,
 key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g. removing
 ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,








Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-21 Thread Tathagata Das
It is kind of unexpected, i can imagine a real scenario under which it
should trigger. But obviously I am missing something :)

TD

On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 Sure. But in general, I am assuming this Graph is unexpectedly null
 when DStream is being serialized must mean something. Under which
 circumstances, such an exception would trigger?

 On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das t...@databricks.com
 wrote:

 Yeah, I am not sure what is going on. The only way to figure to take a
 look at the disassembled bytecodes using javap.

 TD

 On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 At this point I am assuming that nobody has an idea... I am still going
 to give it a last shot just in case it was missed by some people :)

 Thanks,

 On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hey, so I start the context at the very end when all the piping is
 done. BTW a foreachRDD will be called on the resulting dstream.map() right
 after that.

 The puzzling thing is why removing the context bounds solve the
 problem... What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud 
 j...@tellapart.com wrote:

 Hi,

 I am getting this serialization exception and I am not too sure what
 Graph is unexpectedly null when DStream is being serialized means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception: Task not
 serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task
 not serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
 ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.
 clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly
 null when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:
 985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String,
 key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g. removing
 ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,









Re: sparksql - HiveConf not found during task deserialization

2015-04-21 Thread Manku Timma
Akhil, Thanks for the suggestions.
I tried out sc.addJar, --jars, --conf spark.executor.extraClassPath and
none of them helped. I added stuff into compute-classpath.sh. That did not
change anything. I checked the classpath of the running executor and made
sure that the hive jars are in that dir. For me the most confusing thing is
that the executor can actually create HiveConf objects but when it cannot
find that when the task deserializer is at work.

On 20 April 2015 at 14:18, Akhil Das ak...@sigmoidanalytics.com wrote:

 Can you try sc.addJar(/path/to/your/hive/jar), i think it will resolve
 it.

 Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 12:26 PM, Manku Timma manku.tim...@gmail.com
 wrote:

 Akhil,
 But the first case of creating HiveConf on the executor works fine (map
 case). Only the second case fails. I was suspecting some foul play with
 classloaders.

 On 20 April 2015 at 12:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 Looks like a missing jar, try to print the classpath and make sure the
 hive jar is present.

 Thanks
 Best Regards

 On Mon, Apr 20, 2015 at 11:52 AM, Manku Timma manku.tim...@gmail.com
 wrote:

 I am using spark-1.3 with hadoop-provided and hive-provided and
 hive-0.13.1 profiles. I am running a simple spark job on a yarn cluster by
 adding all hadoop2 and hive13 jars to the spark classpaths.

 If I remove the hive-provided while building spark, I dont face any
 issue. But with hive-provided I am getting a
 java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf in
 the yarn executor.

 Code is below:
 import org.apache.spark._
 import org.apache.spark.sql._
 import org.apache.hadoop.hive.conf.HiveConf

 object Simple {
   def main(args: Array[String]) = {
 val sc = new SparkContext(new SparkConf())
 val sqlC = new  org.apache.spark.sql.hive.HiveContext(sc)

 val x = sc.parallelize(1 to 2).map(x =
   { val h = new HiveConf; h.getBoolean(hive.test, false) })
 x.collect.foreach(x = println(s-  $x
 ))

 val result = sqlC.sql(
   select * from products_avro order by month, name, price
   )
 result.collect.foreach(println)
   }
 }

 The first job (involving map) runs fine. HiveConf is instantiated and
 the conf variable is looked up etc. But the second job (involving the
 select * query) throws the class not found exception.

 The task deserializer is the one throwing the exception. It is unable
 to find the class in its classpath. Not sure what is different from the
 first job which also involved HiveConf.

 157573 [task-result-getter-3] 2015/04/20 11:01:48:287 WARN
 TaskSetManager: Lost task 0.2 in stage 2.0 (TID 4, localhost):
 java.lang.NoClassDefFoundError: org/apache/hadoop/hive/conf/HiveConf
 at java.lang.Class.getDeclaredFields0(Native Method)
 at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
 at java.lang.Class.getDeclaredField(Class.java:1946)
 at
 java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
 at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.io.ObjectStreamClass.init(ObjectStreamClass.java:468)
 at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
 at
 java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 

problem writing to s3

2015-04-21 Thread Daniel Mahler
I am having a strange problem writing to s3 that I have distilled to this
minimal example:

def jsonRaw = s${outprefix}-json-raw
def jsonClean = s${outprefix}-json-clean

val txt = sc.textFile(inpath)//.coalesce(shards, false)
txt.count

val res = txt.saveAsTextFile(jsonRaw)

val txt2 = sc.textFile(jsonRaw +/part-*)
txt2.count

txt2.saveAsTextFile(jsonClean)

This code should simply copy files from inpath to jsonRaw and then from
jsonRaw to jsonClean.
This code executes all the way down to the last line where it hangs after
creating the output directory contatining a _temporary_$folder but no
actual files not even temporary ones.

`outputprefix` is and  bucket url, both jsonRaw and jsonClean are in the
same bucket.
Both calls .count succeed and return the same number. This means Spark can
read from inpath and can both read from and write to jsonRaw. Since
jsonClean is in the same bucket as jsonRaw and the final line does create
the directory, I cannot think of any reason why the files should  not be
written. If there were any access or url problems they should already
manifest when writing jsonRaw.

This problem is completely reproduceable with Spark 1.2.1 and 1.3.1
The console output from the last line is

scala txt0.saveAsTextFile(jsonClean)
15/04/21 22:55:48 INFO storage.BlockManager: Removing broadcast 3
15/04/21 22:55:48 INFO storage.BlockManager: Removing block
broadcast_3_piece0
15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3_piece0 of
size 2024 dropped from memory (free 278251716)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-51-181-81.ec2.internal:45199 in memory (size: 2024.0 B, free:
265.4 MB)
15/04/21 22:55:48 INFO storage.BlockManagerMaster: Updated info of block
broadcast_3_piece0
15/04/21 22:55:48 INFO storage.BlockManager: Removing block broadcast_3
15/04/21 22:55:48 INFO storage.MemoryStore: Block broadcast_3 of size 2728
dropped from memory (free 27825)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-166-129-153.ec2.internal:46671 in memory (size: 2024.0 B, free:
13.8 GB)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-51-153-34.ec2.internal:51691 in memory (size: 2024.0 B, free: 13.8
GB)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-158-142-155.ec2.internal:54690 in memory (size: 2024.0 B, free:
13.8 GB)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-61-144-7.ec2.internal:44849 in memory (size: 2024.0 B, free: 13.8
GB)
15/04/21 22:55:48 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on ip-10-69-77-180.ec2.internal:42417 in memory (size: 2024.0 B, free: 13.8
GB)
15/04/21 22:55:48 INFO spark.ContextCleaner: Cleaned broadcast 3
15/04/21 22:55:49 INFO spark.SparkContext: Starting job: saveAsTextFile at
console:38
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Got job 2 (saveAsTextFile at
console:38) with 96 output partitions (allowLocal=false)
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Final stage: Stage
2(saveAsTextFile at console:38)
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Parents of final stage:
List()
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting Stage 2
(MapPartitionsRDD[5] at saveAsTextFile at console:38), which has no
missing parents
15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(22248) called
with curMem=48112, maxMem=278302556
15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4 stored as
values in memory (estimated size 21.7 KB, free 265.3 MB)
15/04/21 22:55:49 INFO storage.MemoryStore: ensureFreeSpace(17352) called
with curMem=70360, maxMem=278302556
15/04/21 22:55:49 INFO storage.MemoryStore: Block broadcast_4_piece0 stored
as bytes in memory (estimated size 16.9 KB, free 265.3 MB)
15/04/21 22:55:49 INFO storage.BlockManagerInfo: Added broadcast_4_piece0
in memory on ip-10-51-181-81.ec2.internal:45199 (size: 16.9 KB, free: 265.4
MB)
15/04/21 22:55:49 INFO storage.BlockManagerMaster: Updated info of block
broadcast_4_piece0
15/04/21 22:55:49 INFO spark.SparkContext: Created broadcast 4 from
broadcast at DAGScheduler.scala:839
15/04/21 22:55:49 INFO scheduler.DAGScheduler: Submitting 96 missing tasks
from Stage 2 (MapPartitionsRDD[5] at saveAsTextFile at console:38)
15/04/21 22:55:49 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0
with 96 tasks
15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
2.0 (TID 192, ip-10-166-129-153.ec2.internal, PROCESS_LOCAL, 1377 bytes)
15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
2.0 (TID 193, ip-10-61-144-7.ec2.internal, PROCESS_LOCAL, 1377 bytes)
15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
2.0 (TID 194, ip-10-158-142-155.ec2.internal, PROCESS_LOCAL, 1377 bytes)
15/04/21 22:55:49 INFO scheduler.TaskSetManager: Starting task 3.0 

Efficient saveAsTextFile by key, directory for each key?

2015-04-21 Thread Arun Luthra
Is there an efficient way to save an RDD with saveAsTextFile in such a way
that the data gets shuffled into separated directories according to a key?
(My end goal is to wrap the result in a multi-partitioned Hive table)

Suppose you have:

case class MyData(val0: Int, val1: string, directory_name: String)

and an RDD called myrdd with type RDD[MyData]. Suppose that you already
have an array of the distinct directory_name's, called distinct_directories.

A very inefficient way to to this is:

distinct_directories.foreach(
  dir_name = myrdd.filter( mydata = mydata.directory_name == dir_name )
.map( mydata = Array(mydata.val0.toString, mydata.val1).mkString(,))
.coalesce(5)
.saveAsTextFile(base_dir_name/ + f$dir_name)
)

I tried this solution, and it does not do the multiple myrdd.filter's in
parallel.

I'm guessing partitionBy might be in the efficient solution if an easy
efficient solution exists...

Thanks,
Arun


Re: Task not Serializable: Graph is unexpectedly null when DStream is being serialized

2015-04-21 Thread Tathagata Das
Yeah, I am not sure what is going on. The only way to figure to take a look
at the disassembled bytecodes using javap.

TD

On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud j...@tellapart.com
wrote:

 At this point I am assuming that nobody has an idea... I am still going to
 give it a last shot just in case it was missed by some people :)

 Thanks,

 On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hey, so I start the context at the very end when all the piping is done.
 BTW a foreachRDD will be called on the resulting dstream.map() right after
 that.

 The puzzling thing is why removing the context bounds solve the
 problem... What does this exception mean in general?

 On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das t...@databricks.com
 wrote:

 When are you getting this exception? After starting the context?

 TD

 On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud j...@tellapart.com
 wrote:

 Hi,

 I am getting this serialization exception and I am not too sure what
 Graph is unexpectedly null when DStream is being serialized means?

 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
 FAILED, exitCode: 15, (reason: User class threw exception: Task not
 serializable)
 Exception in thread Driver org.apache.spark.SparkException: Task not
 serializable
 at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
 ClosureCleaner.scala:166)
 at org.apache.spark.util.ClosureCleaner$.clean(
 ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
 at org.apache.spark.streaming.dstream.DStream.map(DStream.
 scala:438)
 [...]
 Caused by: java.io.NotSerializableException: Graph is unexpectedly
 null when DStream is being serialized.
 at org.apache.spark.streaming.dstream.DStream$anonfun$
 writeObject$1.apply$mcV$sp(DStream.scala:420)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:
 985)
 at org.apache.spark.streaming.dstream.DStream.writeObject(
 DStream.scala:403)

 The operation comes down to something like this:

 dstream.map(tuple = {
 val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
 (tuple._1, (tuple._2, w)) })

 And StreamState being a very simple standalone object:

 object StreamState {
   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String,
 key: K) : Option[V] = None
 }

 However if I remove the context bounds from K in fetch e.g. removing
 ClassTag and Ordering then everything is fine.

 If anyone has some pointers, I'd really appreciate it.

 Thanks,







Re: Number of input partitions in SparkContext.sequenceFile

2015-04-21 Thread Archit Thakur
Hi,

It should generate the same no of partitions as the no. of splits.
Howd you check no of partitions.? Also please paste your file size and
hdfs-site.xml and mapred-site.xml here.

Thanks and Regards,
Archit Thakur.

On Sat, Apr 18, 2015 at 6:20 PM, Wenlei Xie wenlei@gmail.com wrote:

 Hi,

 I am wondering the mechanism that determines the number of partitions
 created by SparkContext.sequenceFile ?

 For example, although my file has only 4 splits, Spark would create 16
 partitions for it. Is it determined by the file size? Is there any way to
 control it? (Looks like I can only tune minPartitions but not maxPartitions)

 Thank you!

 Best,
 Wenlei





Meet Exception when learning Broadcast Variables

2015-04-21 Thread donhoff_h
Hi, experts.

I wrote a very little program to learn how to use Broadcast Variables, but met 
an exception. The program and the exception are listed as following.  Could 
anyone help me to solve this problem? Thanks!

**My Program is as following**
object TestBroadcast02 {
 var brdcst : Broadcast[Array[Int]] = null

 def main(args: Array[String]) {
   val conf = new SparkConf()
   val sc = new SparkContext(conf)
   brdcst = sc.broadcast(Array(1,2,3,4,5,6))
   val rdd = 
sc.textFile(hdfs://bgdt-dev-hrb/user/spark/tst/charset/A_utf8.txt)
   rdd.foreachPartition(fun1)
   sc.stop()
 }

 def fun1(it : Iterator[String]) : Unit = {
   val v = brdcst.value
   for(i - v) println(BroadCast Variable:+i)
   for(j - it) println(Text File Content:+j)
 }
} 
**The Exception is as following**
15/04/21 17:39:53 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 
(TID 0, bgdt01.dev.hrb): java.lang.NullPointerException
at 
dhao.test.BroadCast.TestBroadcast02$.fun1(TestBroadcast02.scala:27)
at 
dhao.test.BroadCast.TestBroadcast02$$anonfun$main$1.apply(TestBroadcast02.scala:22)
at 
dhao.test.BroadCast.TestBroadcast02$$anonfun$main$1.apply(TestBroadcast02.scala:22)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1497)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

By the way, if I use anonymous function instead of 'fun1' in my program,  it 
works. But since I think the readability is not good for anonymous  functions, 
I still prefer to use the 'fun1' .

Re: Spark Unit Testing

2015-04-21 Thread Emre Sevinc
Hello James,

Did you check the following resources:

 -
https://github.com/apache/spark/tree/master/streaming/src/test/java/org/apache/spark/streaming

 -
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs

--
Emre Sevinç
http://www.bigindustries.be/


On Tue, Apr 21, 2015 at 1:26 PM, James King jakwebin...@gmail.com wrote:

 I'm trying to write some unit tests for my spark code.

 I need to pass a JavaPairDStreamString, String to my spark class.

 Is there a way to create a JavaPairDStream using Java API?

 Also is there a good resource that covers an approach (or approaches) for
 unit testing using Java.

 Regards
 jk




-- 
Emre Sevinc


Re: SparkSQL performance

2015-04-21 Thread Renato Marroquín Mogrovejo
Thanks for the hints guys! much appreciated!
Even if I just do a something like:

Select * from tableX where attribute1  5

I see similar behaviour.

@Michael
Could you point me to any sample code that uses Spark's Rows? We are at a
phase where we can actually change our JavaBeans for something that
provides a better performance than what we are seeing now. Would you
recommend using Avro presentation then?
Thanks again!


Renato M.

2015-04-21 1:18 GMT+02:00 Michael Armbrust mich...@databricks.com:

 There is a cost to converting from JavaBeans to Rows and this code path
 has not been optimized.  That is likely what you are seeing.

 On Mon, Apr 20, 2015 at 3:55 PM, ayan guha guha.a...@gmail.com wrote:

 SparkSQL optimizes better by column pruning and predicate pushdown,
 primarily. Here you are not taking advantage of either.

 I am curious to know what goes in your filter function, as you are not
 using a filter in SQL side.

 Best
 Ayan
 On 21 Apr 2015 08:05, Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com wrote:

 Does anybody have an idea? a clue? a hint?
 Thanks!


 Renato M.

 2015-04-20 9:31 GMT+02:00 Renato Marroquín Mogrovejo 
 renatoj.marroq...@gmail.com:

 Hi all,

 I have a simple query Select * from tableX where attribute1 between 0
 and 5 that I run over a Kryo file with four partitions that ends up being
 around 3.5 million rows in our case.
 If I run this query by doing a simple map().filter() it takes around
 ~9.6 seconds but when I apply schema, register the table into a SqlContext,
 and then run the query, it takes around ~16 seconds. This is using Spark
 1.2.1 with Scala 2.10.0
 I am wondering why there is such a big gap on performance if it is just
 a filter. Internally, the relation files are mapped to a JavaBean. This
 different data presentation (JavaBeans vs SparkSQL internal representation)
 could lead to such difference? Is there anything I could do to make the
 performance get closer to the hard-coded option?
 Thanks in advance for any suggestions or ideas.


 Renato M.






Spark Unit Testing

2015-04-21 Thread James King
I'm trying to write some unit tests for my spark code.

I need to pass a JavaPairDStreamString, String to my spark class.

Is there a way to create a JavaPairDStream using Java API?

Also is there a good resource that covers an approach (or approaches) for
unit testing using Java.

Regards
jk


Error while running SparkPi in Hadoop HA

2015-04-21 Thread Fernando O.
Hi all,

I'm wondering if SparkPi works with hadoop HA (I guess it should)


Hadoop's pi example works great on my cluster, so after having that
done I installed spark and in the worker log I'm seeing two problems
that might be related.


Versions: Hadoop 2.6.0

  Spark 1.3.1


I'm running :

./bin/spark-submit --class org.apache.spark.examples.SparkPi
--master yarn-cluster --num-executors 2 --driver-memory 2g
--executor-memory 1g --executor-cores 1
lib/spark-examples*.jar 10


It seems like it can't see the resource manager but I pinged and telnet it
from the node and that works, also: hadoop's pi example works...

So I'm not sure why spark is not seeing the RM



15/04/21 15:56:21 INFO yarn.YarnRMClient: Registering the
ApplicationMaster*15/04/21 15:56:22 INFO ipc.Client: Retrying connect
to server: 0.0.0.0/0.0.0.0:8030 http://0.0.0.0/0.0.0.0:8030. Already
tried 0 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:23 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030 http://0.0.0.0/0.0.0.0:8030. Already tried 1
time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)*
15/04/21 15:56:24 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030. Already tried 2 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:25 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030. Already tried 3 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:26 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030. Already tried 4 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:27 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030. Already tried 5 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:28 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030. Already tried 6 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:29 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030. Already tried 7 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:30 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030. Already tried 8 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:31 INFO ipc.Client: Retrying connect to server:
0.0.0.0/0.0.0.0:8030. Already tried 9 time(s); retry policy is
RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
15/04/21 15:56:51 INFO cluster.YarnClusterSchedulerBackend:
SchedulerBackend is ready for scheduling beginning after waiting
maxRegisteredResourcesWaitingTime: 3(ms)
15/04/21 15:56:51 INFO cluster.YarnClusterScheduler:
YarnClusterScheduler.postStartHook done
15/04/21 15:56:51 INFO spark.SparkContext: Starting job: reduce at
SparkPi.scala:35
15/04/21 15:56:51 INFO scheduler.DAGScheduler: Got job 0 (reduce at
SparkPi.scala:35) with 10 output partitions (allowLocal=false)
15/04/21 15:56:51 INFO scheduler.DAGScheduler: Final stage: Stage
0(reduce at SparkPi.scala:35)
15/04/21 15:56:51 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/04/21 15:56:51 INFO scheduler.DAGScheduler: Missing parents: List()
15/04/21 15:56:51 INFO scheduler.DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[1] at map at SparkPi.scala:31), which has no missing
parents
15/04/21 15:56:51 INFO cluster.YarnClusterScheduler: Cancelling stage 0
15/04/21 15:56:51 INFO scheduler.DAGScheduler: Stage 0 (reduce at
SparkPi.scala:35) failed in Unknown s
15/04/21 15:56:51 INFO scheduler.DAGScheduler: Job 0 failed: reduce at
SparkPi.scala:35, took 0.121974 s*15/04/21 15:56:51 ERROR
yarn.ApplicationMaster: User class threw exception: Job aborted due to
stage failure: Task serialization failed:
java.lang.reflect.InvocationTargetException
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:526)*
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68)
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60)org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73)
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:79)

Re: Join on DataFrames from the same source (Pyspark)

2015-04-21 Thread ayan guha
you are correct. Just found the same thing. You are better off with sql,
then.

userSchemaDF = ssc.createDataFrame(userRDD)

userSchemaDF.registerTempTable(users)
#print userSchemaDF.take(10)

#SQL API works as expected

sortedDF = ssc.sql(SELECT userId,age,gender,work from users order by
work,age)
#print sortedDF.take(10)
df1 = userSchemaDF.select(userId,age)
df1.registerTempTable(df1)
df2 = userSchemaDF.select(userId,work)
df2.registerTempTable(df2)

dfjs = ssc.sql(select age,work from df1 d1 inner join df2 d2 on
d1.userId=d2.userId)
print dfjs.count()

#DF API does not
dfj = df1.join(df2,df1.userId==df2.userId,inner)
print dfj.count()

943
889249

On Wed, Apr 22, 2015 at 1:10 AM, Karlson ksonsp...@siberie.de wrote:

 Sorry, my code actually was

 df_one = df.select('col1', 'col2')
 df_two = df.select('col1', 'col3')

 But in Spark 1.4.0 this does not seem to make any difference anyway and
 the problem is the same with both versions.



 On 2015-04-21 17:04, ayan guha wrote:

 your code should be

  df_one = df.select('col1', 'col2')
  df_two = df.select('col1', 'col3')

 Your current code is generating a tupple, and of course df_1 and df_2 are
 different, so join is yielding to cartesian.

 Best
 Ayan

 On Wed, Apr 22, 2015 at 12:42 AM, Karlson ksonsp...@siberie.de wrote:

  Hi,

 can anyone confirm (and if so elaborate on) the following problem?

 When I join two DataFrames that originate from the same source DataFrame,
 the resulting DF will explode to a huge number of rows. A quick example:

 I load a DataFrame with n rows from disk:

 df = sql_context.parquetFile('data.parquet')

 Then I create two DataFrames from that source.

 df_one = df.select(['col1', 'col2'])
 df_two = df.select(['col1', 'col3'])

 Finally I want to (inner) join them back together:

 df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'],
 'inner')

 The key in col1 is unique. The resulting DataFrame should have n rows,
 however it does have n*n rows.

 That does not happen, when I load df_one and df_two from disk directly. I
 am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Best Regards,
Ayan Guha


Re: Instantiating/starting Spark jobs programmatically

2015-04-21 Thread Richard Marscher
Could you possibly describe what you are trying to learn how to do in more
detail? Some basics of submitting programmatically:

- Create a SparkContext instance and use that to build your RDDs
- You can only have 1 SparkContext per JVM you are running, so if you need
to satisfy concurrent job requests you would need to manage a SparkContext
as a shared resource on that server. Keep in mind if something goes wrong
with that SparkContext, all running jobs would probably be in a failed
state and you'd need to try to get a new SparkContext.
- There are System.exit calls built into Spark as of now that could kill
your running JVM. We have shadowed some of the most offensive bits within
our own application to work around this. You'd likely want to do that or to
do your own Spark fork. For example, if the SparkContext can't connect to
your cluster master node when it is created, it will System.exit.
- You'll need to provide all of the relevant classes that your platform
uses in the jobs on the classpath of the spark cluster. We do this with a
JAR file loaded from S3 dynamically by a SparkContext, but there are other
options.

On Mon, Apr 20, 2015 at 10:12 PM, firemonk9 dhiraj.peech...@gmail.com
wrote:

 I have built a data analytics SaaS platform by creating Rest end points and
 based on the type of job request I would invoke the necessary spark
 job/jobs
 and return the results as json(async). I used yarn-client mode to submit
 the
 jobs to yarn cluster.

 hope this helps.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577p22584.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org