Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
By the way this happens when I stooped the Driver process ...

On Tue, May 19, 2015 at 12:29 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 You mean to say within Runtime.getRuntime().addShutdownHook I call
 ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?

 This won't work anymore in 1.4.

 The SparkContext got stopped before Receiver processed all received blocks
 and I see below exception in logs. But if I add the Utils.addShutdownHook
 with the priority as I mentioned , then only graceful shutdown works . In
 that case shutdown-hook run in priority order.



 *INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop
 signal to all 3 receivers*
 ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
 receiver for stream 0: Stopped by driver
 ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
 receiver for stream 1: Stopped by driver
 ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
 receiver for stream 2: Stopped by driver
 *INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook*
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/streaming/batch,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/streaming/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/streaming,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/metrics/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/static,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/executors/threadDump,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/executors/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/executors,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/environment/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/environment,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/storage/rdd,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/storage/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/storage,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/pool/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/pool,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/stage/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/stage,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/stages,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/jobs/job/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/jobs/job,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/jobs/json,null}
 INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
 o.s.j.s.ServletContextHandler{/jobs,null}
 INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at
 http://10.252.5.113:4040
 INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler
 INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at
 Consumer.java:122, took 10.398746 s
 *Exception in thread Thread-28 org.apache.spark.SparkException: Job
 cancelled because SparkContext was shut down*
 at
 

Re: TwitterUtils on Windows

2015-05-19 Thread Akhil Das
Hi Justin,

Can you try with sbt, may be that will help.

- Install sbt for windows
http://www.scala-sbt.org/0.13/tutorial/Installing-sbt-on-Windows.html

- Create a lib directory in your project directory
- Place these jars in it:
- spark-streaming-twitter_2.10-1.3.1.jar
- twitter4j-async-3.0.3.jar
- twitter4j-core-3.0.3.jar
- twitter4j-media-support-3.0.3.jar
- twitter4j-stream-3.0.3.jar

- Create a build.sbt file and add these contents:

name := twitterStream

version := 1.0

scalaVersion := 2.10.4

libraryDependencies += org.apache.spark % spark-streaming_2.10 % 1.3.1

- Create a TwitterStream.scala and add these contents:


import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming._
import org.apache.spark.{SparkContext, SparkConf}


object TwitterStream {
  def main(args: Array[String]) {


  System.setProperty(twitter4j.oauth.consumerKey,*)
System.setProperty(twitter4j.oauth.consumerSecret,*)
System.setProperty(twitter4j.oauth.accessToken,*)
System.setProperty(twitter4j.oauth.accessTokenSecret,*)


val sconf = new SparkConf()
  .setMaster(local[*])
  .setAppName(TwitterStream)

val sc = new SparkContext(sconf)

val ssc = new StreamingContext(sc, Seconds(10))
val stream = TwitterUtils.createStream(ssc, None)
 ssc.start()
ssc.awaitTermination()


  }
}


- Now do a sbt run



Thanks
Best Regards

On Tue, May 19, 2015 at 9:56 AM, Justin Pihony justin.pih...@gmail.com
wrote:

 I think I found the answer -
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-example-scala-application-using-spark-submit-td10056.html

 Do I have no way of running this in Windows locally?


 On Mon, May 18, 2015 at 10:44 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 I'm not 100% sure that is causing a problem, though. The stream still
 starts, but is giving blank output. I checked the environment variables in
 the ui and it is running local[*], so there should be no bottleneck there.

 On Mon, May 18, 2015 at 10:08 PM, Justin Pihony justin.pih...@gmail.com
 wrote:

 I am trying to print a basic twitter stream and receiving the following
 error:


 15/05/18 22:03:14 INFO Executor: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with
 timestamp 1432000973058
 15/05/18 22:03:14 INFO Utils: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to
 C:\Users\Justin\AppData\Local\Temp\spark-4a37d3

 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp
 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0
 (TID 0)
 java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja
 va:715)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
 at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
 at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:374)
 at

 org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:366)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at

 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at

 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at

 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at
 scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.executor.Executor.org
 $apache$spark$executor$Executor$$updateDependencies(Executor.scala:366)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:744)


 Code is:

 spark-shell --jars

 \Spark\lib\spark-streaming-twitter_2.10-1.3.1.jar,\Spark\lib\twitter4j-async-3.0.3.jar,\Spark\lib\twitter4j-core-3.0.3.jar,\Spark\lib\twitter4j-media-support-3.0.3.jar,\Spark\lib\twitter4j-stream-3.0.3.jar

 import org.apache.spark.streaming.twitter._
 import org.apache.spark.streaming._

 System.setProperty(twitter4j.oauth.consumerKey,*)
 System.setProperty(twitter4j.oauth.consumerSecret,*)
 System.setProperty(twitter4j.oauth.accessToken,*)
 

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Sean Owen
I don't think you should rely on a shutdown hook. Ideally you try to
stop it in the main exit path of your program, even in case of an
exception.

On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
dibyendu.bhattach...@gmail.com wrote:
 You mean to say within Runtime.getRuntime().addShutdownHook I call
 ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?

 This won't work anymore in 1.4.

 The SparkContext got stopped before Receiver processed all received blocks
 and I see below exception in logs. But if I add the Utils.addShutdownHook
 with the priority as I mentioned , then only graceful shutdown works . In
 that case shutdown-hook run in priority order.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
Thenka Sean . you are right. If driver program is running then I can handle
shutdown in main exit path  . But if Driver machine is crashed (if you just
stop the application, for example killing the driver process ), then
Shutdownhook is the only option isn't it ? What I try to say is , just
doing ssc.stop in  sys.ShutdownHookThread  or
 Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
to use the Utils.addShutdownHook with a priority .. So just checking if
Spark Streaming can make graceful shutdown as default shutdown mechanism.

Dibyendu

On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote:

 I don't think you should rely on a shutdown hook. Ideally you try to
 stop it in the main exit path of your program, even in case of an
 exception.

 On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  You mean to say within Runtime.getRuntime().addShutdownHook I call
  ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
 
  This won't work anymore in 1.4.
 
  The SparkContext got stopped before Receiver processed all received
 blocks
  and I see below exception in logs. But if I add the Utils.addShutdownHook
  with the priority as I mentioned , then only graceful shutdown works . In
  that case shutdown-hook run in priority order.
 



Re: group by and distinct performance issue

2015-05-19 Thread Akhil Das
Hi Peer,

If you open the driver UI (running on port 4040) you can see the stages and
the tasks happening inside it. Best way to identify the bottleneck for a
stage is to see if there's any time spending on GC, and how many tasks are
there per stage (it should be a number  total # cores to achieve max
parallelism). Also you can see for each task how long does it take etc into
consideration.

Thanks
Best Regards

On Tue, May 19, 2015 at 12:58 PM, Peer, Oded oded.p...@rsa.com wrote:

  I am running Spark over Cassandra to process a single table.

 My task reads a single days’ worth of data from the table and performs 50
 group by and distinct operations, counting distinct userIds by different
 grouping keys.

 My code looks like this:



JavaRddRow rdd = sc.parallelize().mapPartitions().cache() // reads
 the data from the table

for each groupingKey {

   JavaPairRddGroupingKey, UserId groupByRdd = rdd.mapToPair();

   JavaPairRDDGroupingKey, Long countRdd =
 groupByRdd.distinct().mapToPair().reduceByKey() // counts distinct values
 per grouping key

}



 The distinct() stage takes about 2 minutes for every groupByValue, and my
 task takes well over an hour to complete.

 My cluster has 4 nodes and 30 GB of RAM per Spark process, the table size
 is 4 GB.



 How can I identify the bottleneck more accurately? Is it caused by
 shuffling data?

 How can I improve the performance?



 Thanks,

 Oded



Re: --jars works in yarn-client but not yarn-cluster mode, why?

2015-05-19 Thread Fengyun RAO
Thanks, Marcelo!


Below is the full log,


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/05/19 14:08:58 INFO yarn.ApplicationMaster: Registered signal
handlers for [TERM, HUP, INT]
15/05/19 14:08:59 INFO yarn.ApplicationMaster: ApplicationAttemptId:
appattempt_1432015548391_0003_01
15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to:
nobody,raofengyun
15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to:
nobody,raofengyun
15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(nobody, raofengyun); users with modify permissions:
Set(nobody, raofengyun)
15/05/19 14:09:00 INFO yarn.ApplicationMaster: Starting the user
application in a separate Thread
15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark
context initialization
15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark
context initialization ...
15/05/19 14:09:00 INFO spark.SparkContext: Running Spark version 1.3.0
15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to:
nobody,raofengyun
15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to:
nobody,raofengyun
15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(nobody, raofengyun); users with modify permissions:
Set(nobody, raofengyun)
15/05/19 14:09:01 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/19 14:09:01 INFO Remoting: Starting remoting
15/05/19 14:09:01 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkDriver@gs-server-v-127:7191]
15/05/19 14:09:01 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@gs-server-v-127:7191]
15/05/19 14:09:01 INFO util.Utils: Successfully started service
'sparkDriver' on port 7191.
15/05/19 14:09:01 INFO spark.SparkEnv: Registering MapOutputTracker
15/05/19 14:09:01 INFO spark.SparkEnv: Registering BlockManagerMaster
15/05/19 14:09:01 INFO storage.DiskBlockManager: Created local
directory at 
/data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/blockmgr-3250910b-693e-46ff-b057-26d552fd8abd
15/05/19 14:09:01 INFO storage.MemoryStore: MemoryStore started with
capacity 259.7 MB
15/05/19 14:09:01 INFO spark.HttpFileServer: HTTP File server
directory is 
/data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/httpd-5bc614bc-d8b1-473d-a807-4d9252eb679d
15/05/19 14:09:01 INFO spark.HttpServer: Starting HTTP Server
15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/19 14:09:01 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:9349
15/05/19 14:09:01 INFO util.Utils: Successfully started service 'HTTP
file server' on port 9349.
15/05/19 14:09:01 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/05/19 14:09:01 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/19 14:09:01 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:63023
15/05/19 14:09:01 INFO util.Utils: Successfully started service
'SparkUI' on port 63023.
15/05/19 14:09:01 INFO ui.SparkUI: Started SparkUI at
http://gs-server-v-127:63023
15/05/19 14:09:02 INFO cluster.YarnClusterScheduler: Created
YarnClusterScheduler
15/05/19 14:09:02 INFO netty.NettyBlockTransferService: Server created on 33526
15/05/19 14:09:02 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/05/19 14:09:02 INFO storage.BlockManagerMasterActor: Registering
block manager gs-server-v-127:33526 with 259.7 MB RAM,
BlockManagerId(driver, gs-server-v-127, 33526)
15/05/19 14:09:02 INFO storage.BlockManagerMaster: Registered BlockManager
15/05/19 14:09:02 INFO scheduler.EventLoggingListener: Logging events
to 
hdfs://gs-server-v-127:8020/user/spark/applicationHistory/application_1432015548391_0003
15/05/19 14:09:02 INFO yarn.ApplicationMaster: Listen to driver:
akka.tcp://sparkDriver@gs-server-v-127:7191/user/YarnScheduler
15/05/19 14:09:02 INFO cluster.YarnClusterSchedulerBackend:
ApplicationMaster registered as
Actor[akka://sparkDriver/user/YarnAM#1902752386]
15/05/19 14:09:02 INFO client.RMProxy: Connecting to ResourceManager
at gs-server-v-127/10.200.200.56:8030
15/05/19 14:09:02 INFO yarn.YarnRMClient: Registering the ApplicationMaster
15/05/19 14:09:03 INFO yarn.YarnAllocator: Will request 2 executor
containers, each with 1 cores and 4480 MB memory 

Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Tathagata Das
If you wanted to stop it gracefully, then why are you not calling
ssc.stop(stopGracefully = true, stopSparkContext = true)? Then it doesnt
matter whether the shutdown hook was called or not.

TD

On Mon, May 18, 2015 at 9:43 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Just figured out that if I want to perform graceful shutdown of Spark
 Streaming 1.4 ( from master ) , the Runtime.getRuntime().addShutdownHook no
 longer works . As in Spark 1.4 there is Utils.addShutdownHook defined for
 Spark Core, that gets anyway called , which leads to graceful shutdown from
 Spark streaming failed with error like Sparkcontext already closed issue.

 To solve this , I need to explicitly add Utils.addShutdownHook in my
 driver with higher priority ( say 150 ) than Spark's shutdown priority of
 50 , and there I specified streamingcontext stop method with (false , true)
 parameter.

 Just curious to know , if this is how we need to handle shutdown hook
 going forward ?

 Can't we make the streaming shutdown default to gracefully  shutdown ?

 Also the Java Api for adding shutdownhook in Utils looks very dirty with
 methods like this ..



 Utils.addShutdownHook(150, new Function0BoxedUnit() {
  @Override
 public BoxedUnit apply() {
 return null;
 }

 @Override
 public byte apply$mcB$sp() {
 return 0;
 }

 @Override
 public char apply$mcC$sp() {
 return 0;
 }

 @Override
 public double apply$mcD$sp() {
 return 0;
 }

 @Override
 public float apply$mcF$sp() {
 return 0;
 }

 @Override
 public int apply$mcI$sp() {
 // TODO Auto-generated method stub
 return 0;
 }

 @Override
 public long apply$mcJ$sp() {
 return 0;
 }

 @Override
 public short apply$mcS$sp() {
 return 0;
 }

 @Override
 public void apply$mcV$sp() {
  *jsc.stop(false, true);*
  }

 @Override
 public boolean apply$mcZ$sp() {
 // TODO Auto-generated method stub
 return false;
 }
 });



Re: [SparkStreaming] Is it possible to delay the start of some DStream in the application?

2015-05-19 Thread Tathagata Das
If you dont want the fileStream to start only after certain event has
happened, why not start the streamingContext after that event?

TD

On Sun, May 17, 2015 at 7:51 PM, Haopu Wang hw...@qilinsoft.com wrote:

  I want to use file stream as input. And I look at SparkStreaming
 document again, it's saying file stream doesn't need a receiver at all.

 So I'm wondering if I can control a specific DStream instance.


  --

 *From:* Evo Eftimov [mailto:evo.efti...@isecc.com]
 *Sent:* Monday, May 18, 2015 12:39 AM
 *To:* 'Akhil Das'; Haopu Wang
 *Cc:* 'user'
 *Subject:* RE: [SparkStreaming] Is it possible to delay the start of some
 DStream in the application?



 You can make ANY *standard* receiver sleep by implementing a custom
 Message Deserializer class with sleep method inside it.



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Sunday, May 17, 2015 4:29 PM
 *To:* Haopu Wang
 *Cc:* user
 *Subject:* Re: [SparkStreaming] Is it possible to delay the start of some
 DStream in the application?



 Why not just trigger your batch job with that event?



 If you really need streaming, then you can create a custom receiver and
 make the receiver sleep till the event has happened. That will obviously
 run your streaming pipelines without having any data to process.


   Thanks

 Best Regards



 On Fri, May 15, 2015 at 4:39 AM, Haopu Wang hw...@qilinsoft.com wrote:

 In my application, I want to start a DStream computation only after an
 special event has happened (for example, I want to start the receiver
 only after the reference data has been properly initialized).

 My question is: it looks like the DStream will be started right after
 the StreaminContext has been started. Is it possible to delay the start
 of specific DStream?

 Thank you very much!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread N B
Hi Imran,

If I understood you correctly, you are suggesting to simply call broadcast
again from the driver program. This is exactly what I am hoping will work
as I have the Broadcast data wrapped up and I am indeed (re)broadcasting
the wrapper over again when the underlying data changes. However,
documentation seems to suggest that one cannot re-broadcast. Is my
understanding accurate?

Thanks
NB


On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com wrote:

 Rather than updating the broadcast variable, can't you simply create a
 new one?  When the old one can be gc'ed in your program, it will also get
 gc'ed from spark's cache (and all executors).

 I think this will make your code *slightly* more complicated, as you need
 to add in another layer of indirection for which broadcast variable to use,
 but not too bad.  Eg., from

 var myBroadcast = sc.broadcast( ...)
 (0 to 20).foreach{ iteration =
   //  ... some rdd operations that involve myBroadcast ...
   myBroadcast.update(...) // wrong! dont' update a broadcast variable
 }

 instead do something like:

 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
  ...
 }

 var myBroadcast = sc.broadcast(...)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here, with
 whatever you need to update it
 }

 On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is broadcast
 in order to ensure that all nodes get the same value of the broadcast
 variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the underlying
 data is updated in order to get the changes visible on all nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. If the underlying data
 changes then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(),
 can it
 ever be updated again? The use case is for something like the
 underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Best Regards,
 Ayan Guha






Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-19 Thread Dibyendu Bhattacharya
You mean to say within Runtime.getRuntime().addShutdownHook I call
ssc.stop(stopSparkContext
 = true, stopGracefully  = true) ?

This won't work anymore in 1.4.

The SparkContext got stopped before Receiver processed all received blocks
and I see below exception in logs. But if I add the Utils.addShutdownHook
with the priority as I mentioned , then only graceful shutdown works . In
that case shutdown-hook run in priority order.



*INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Sent stop
signal to all 3 receivers*
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
receiver for stream 0: Stopped by driver
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
receiver for stream 1: Stopped by driver
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered
receiver for stream 2: Stopped by driver
*INFO : org.apache.spark.SparkContext - Invoking stop() from shutdown hook*
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/streaming/batch,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/streaming/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/streaming,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/static,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/executors/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/executors,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/environment/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/environment,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/storage/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/storage,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/stages,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}
INFO : org.spark-project.jetty.server.handler.ContextHandler - stopped
o.s.j.s.ServletContextHandler{/jobs,null}
INFO : org.apache.spark.ui.SparkUI - Stopped Spark web UI at
http://10.252.5.113:4040
INFO : org.apache.spark.scheduler.DAGScheduler - Stopping DAGScheduler
INFO : org.apache.spark.scheduler.DAGScheduler - Job 4 failed: start at
Consumer.java:122, took 10.398746 s
*Exception in thread Thread-28 org.apache.spark.SparkException: Job
cancelled because SparkContext was shut down*
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735)
INFO : org.apache.spark.scheduler.DAGScheduler 

Re: org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3

2015-05-19 Thread Akhil Das
There were some similar discussion happened on JIRA
https://issues.apache.org/jira/browse/SPARK-3633 may be that will give you
some insights.

Thanks
Best Regards

On Mon, May 18, 2015 at 10:49 PM, zia_kayani zia.kay...@platalytics.com
wrote:

 Hi, I'm getting this exception after shifting my code from Spark 1.2 to
 Spark
 1.3

 15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84,
 cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337),
 shuffleId=0, mapId=9, reduceId=1, message=
 org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException:
 Failed to open file:

 /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
 at

 org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202)
 at

 org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112)
 at

 org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
 at

 org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
 at

 org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
 at

 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
 at

 org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
 at

 io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at

 io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at

 io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
 at

 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
 at

 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
 at

 io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
 at

 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
 at
 io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
 at

 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
 at

 io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
 at

 io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.FileNotFoundException:

 /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
 (Permission denied)
 at java.io.FileInputStream.open(Native Method)
 at java.io.FileInputStream.init(FileInputStream.java:146)
 at

 org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191)
 ... 23 more



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




group by and distinct performance issue

2015-05-19 Thread Peer, Oded
I am running Spark over Cassandra to process a single table.
My task reads a single days' worth of data from the table and performs 50 group 
by and distinct operations, counting distinct userIds by different grouping 
keys.
My code looks like this:

   JavaRddRow rdd = sc.parallelize().mapPartitions().cache() // reads the 
data from the table
   for each groupingKey {
  JavaPairRddGroupingKey, UserId groupByRdd = rdd.mapToPair();
  JavaPairRDDGroupingKey, Long countRdd = 
groupByRdd.distinct().mapToPair().reduceByKey() // counts distinct values per 
grouping key
   }

The distinct() stage takes about 2 minutes for every groupByValue, and my task 
takes well over an hour to complete.
My cluster has 4 nodes and 30 GB of RAM per Spark process, the table size is 4 
GB.

How can I identify the bottleneck more accurately? Is it caused by shuffling 
data?
How can I improve the performance?

Thanks,
Oded


spark streaming doubt

2015-05-19 Thread Shushant Arora
What happnes if in a streaming application one job is not yet finished and
stream interval reaches. Does it starts next job or wait for first to
finish and rest jobs will keep on accumulating in queue.


Say I have a streaming application with stream interval of 1 sec, but my
job takes 2 min to process 1 sec stream , what will happen ?  At any time
there will be only one job running or multiple ?


Re: Spark and Flink

2015-05-19 Thread Pa Rö
it's sound good, maybe you can send me pseudo structure, that is my fist
maven project.

best regards,
paul

2015-05-18 14:05 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi,
 I would really recommend you to put your Flink and Spark dependencies into
 different maven modules.
 Having them both in the same project will be very hard, if not impossible.
 Both projects depend on similar projects with slightly different versions.

 I would suggest a maven module structure like this:
 yourproject-parent (a pom module)
 -- yourproject-common
 -- yourproject-flink
 -- yourproject-spark



 On Mon, May 18, 2015 at 10:00 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi,
 if i add your dependency i get over 100 errors, now i change the version
 number:
 dependencies
 dependency
 groupIdcom.fasterxml.jackson.module/groupId
 artifactIdjackson-module-scala_2.10/artifactId
 version2.4.4/version
 exclusions
 exclusion
 groupIdcom.google.guava/groupId
 artifactIdguava/artifactId
 /exclusion
 /exclusions
 /dependency

 now the pom is fine, but i get the same error by run spark:
 WARN component.AbstractLifeCycle: FAILED
 org.eclipse.jetty.servlet.DefaultServlet-608411067:
 java.lang.NoSuchMethodError:
 org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V

 java.lang.NoSuchMethodError:
 org.eclipse.jetty.server.ResourceCache.init(Lorg/eclipse/jetty/http/MimeTypes;)V
 at
 org.eclipse.jetty.servlet.NIOResourceCache.init(NIOResourceCache.java:41)
 at
 org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223)
 at javax.servlet.GenericServlet.init(GenericServlet.java:244)
 at
 org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442)
 at
 org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721)
 at
 org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279)
 at
 org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717)
 at
 org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.eclipse.jetty.server.handler.HandlerCollection.doStart(HandlerCollection.java:229)
 at
 org.eclipse.jetty.server.handler.ContextHandlerCollection.doStart(ContextHandlerCollection.java:172)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.eclipse.jetty.server.handler.HandlerWrapper.doStart(HandlerWrapper.java:95)
 at org.eclipse.jetty.server.Server.doStart(Server.java:282)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199)
 at
 org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
 at
 org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
 at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
 at
 org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:209)
 at org.apache.spark.ui.WebUI.bind(WebUI.scala:102)
 at org.apache.spark.SparkContext.init(SparkContext.scala:224)
 at
 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
 at
 mgm.tp.bigdata.tempGeoKmeans.Spark.SparkMain.main(SparkMain.java:37)
 ...

 what i do wrong?

 best regards
 paul

 2015-05-13 15:43 GMT+02:00 Ted Yu yuzhih...@gmail.com:

 You can use exclusion to remove the undesired jetty version.
 Here is syntax:
   dependency
 groupIdcom.fasterxml.jackson.module/groupId
 artifactIdjackson-module-scala_2.10/artifactId
 version${fasterxml.jackson.version}/version
 exclusions
   exclusion
 groupIdcom.google.guava/groupId
 artifactIdguava/artifactId
   /exclusion
 /exclusions
   /dependency

 On Wed, May 13, 2015 at 6:41 AM, Paul Röwer 
 paul.roewer1...@googlemail.com wrote:

 Okay. And how i get it clean in my maven project?


 Am 13. Mai 2015 15:15:34 MESZ, schrieb Ted Yu yuzhih...@gmail.com:

 You can run the following command:
 mvn dependency:tree

 And see what jetty versions are brought in.

 Cheers



 On May 13, 2015, at 6:07 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi,

 i use spark and flink in the same maven project,

 now i get a exception on working with spark, flink work well

 the problem are transitiv dependencies.

 

Re: spark streaming doubt

2015-05-19 Thread Akhil Das
It will be a single job running at a time by default (you can also
configure the spark.streaming.concurrentJobs to run jobs parallel which is
not recommended to put in production).

Now, your batch duration being 1 sec and processing time being 2 minutes,
if you are using a receiver based streaming then ideally those receivers
will keep on receiving data while the job is running (which will accumulate
in memory if you set StorageLevel as MEMORY_ONLY and end up in block not
found exceptions as spark drops some blocks which are yet to process to
accumulate new blocks). If you are using a non-receiver based approach, you
will not have this problem of dropping blocks.

Ideally, if your data is small and you have enough memory to hold your data
then it will run smoothly without any issues.

Thanks
Best Regards

On Tue, May 19, 2015 at 1:23 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 What happnes if in a streaming application one job is not yet finished and
 stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but my
 job takes 2 min to process 1 sec stream , what will happen ?  At any time
 there will be only one job running or multiple ?




Spark 1.3.1 Performance Tuning/Patterns for Data Generation Heavy/Throughput Jobs

2015-05-19 Thread Night Wolf
Hi all,

I have a job that, for every row, creates about 20 new objects (i.e. RDD of
100 rows in = RDD 2000 rows out). The reason for this is each row is tagged
with a list of the 'buckets' or 'windows' it belongs to.

The actual data is about 10 billion rows. Each executor has 60GB of memory.

Currently I have a mapPartitions task that is doing this object creation in
a Scala Map and then returning the HashMap as an iterator via .toIterator.

Is there a more efficient way to do this (assuming I can't use something
like flatMap).

The job runs (assuming each task size is small enough). But the GC time is
understandably off the charts.

I've reduced the spark cache memory percentage to 0.05 (as I just need
space for a few broadcasts and this is a data churn task). I've left the
shuffle memory percent unchanged.

What kinds of settings should I be tuning with regards to GC?

Looking at
https://spark-summit.org/2014/wp-content/uploads/2015/03/SparkSummitEast2015-AdvDevOps-StudentSlides.pdf
slide
125 recommends some settings but I'm not sure what would be best here). I
tried using -XX:+UseG1GC but it pretty much causes my job to fail (all the
executors die). Are there any tips with respect to the ratio of new gen and
old gen space when creating lots of objects which will live in a data
structure until the entire partition is processed?

Any tips for tuning these kinds of jobs would be helpful!

Thanks,
~N


Re: py-files (and others?) not properly set up in cluster-mode Spark Yarn job?

2015-05-19 Thread Shay Rojansky
Thanks for the quick response and confirmation, Marcelo, I just opened
https://issues.apache.org/jira/browse/SPARK-7725.

On Mon, May 18, 2015 at 9:02 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Shay,

 Yeah, that seems to be a bug; it doesn't seem to be related to the default
 FS nor compareFs either - I can reproduce this with HDFS when copying files
 from the local fs too. In yarn-client mode things seem to work.

 Could you file a bug to track this? If you don't have a jira account I can
 do that for you.


 On Mon, May 18, 2015 at 9:38 AM, Shay Rojansky r...@roji.org wrote:

 I'm having issues with submitting a Spark Yarn job in cluster mode when
 the cluster filesystem is file:///. It seems that additional resources
 (--py-files) are simply being skipped and not being added into the
 PYTHONPATH. The same issue may also exist for --jars, --files, etc.

 We use a simple NFS mount on all our nodes instead of HDFS. The problem
 is that when I submit a job that has files (via --py-files), these don't
 get copied across to the application's staging directory, nor do they get
 added to the PYTHONPATH. On startup, I can clearly see the message Source
 and destination file systems are the same. Not copying, which is a result
 of the check here:
 https://github.com/apache/spark/blob/master/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L221

 The compareFs function simply looks whether the scheme, host and port are
 the same, and if so (my case), simply skips the copy. While that in itself
 isn't a problem, the PYTHONPATH isn't updated either.




 --
 Marcelo



Re: spark streaming doubt

2015-05-19 Thread Akhil Das
spark.streaming.concurrentJobs takes an integer value, not boolean. If you
set it as 2 then 2 jobs will run parallel. Default value is 1 and the next
job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead to
 weird sharing of resources and which can make it hard to debug the whether
 there is sufficient resources in the system to process the ingested data
 fast enough. With only 1 job running at a time, it is easy to see that if
 batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


Copied from TD's answer written in SO
http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
.

Non-receiver based streaming for example you can say are the fileStream,
directStream ones. You can read a bit of information from here
https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

Thanks
Best Regards

On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2 minutes,
 if you are using a receiver based streaming then ideally those receivers
 will keep on receiving data while the job is running (which will accumulate
 in memory if you set StorageLevel as MEMORY_ONLY and end up in block not
 found exceptions as spark drops some blocks which are yet to process to
 accumulate new blocks). If you are using a non-receiver based approach, you
 will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet finished
 and stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but my
 job takes 2 min to process 1 sec stream , what will happen ?  At any time
 there will be only one job running or multiple ?






Re: TwitterUtils on Windows

2015-05-19 Thread Steve Loughran

 On 19 May 2015, at 03:08, Justin Pihony justin.pih...@gmail.com wrote:
 
 
 15/05/18 22:03:14 INFO Executor: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar with
 timestamp 1432000973058
 15/05/18 22:03:14 INFO Utils: Fetching
 http://192.168.56.1:49752/jars/twitter4j-media-support-3.0.3.jar to
 C:\Users\Justin\AppData\Local\Temp\spark-4a37d3
 e9-34a2-40d4-b09b-6399931f527d\userFiles-65ee748e-4721-4e16-9fe6-65933651fec1\fetchFileTemp8970201232303518432.tmp
 15/05/18 22:03:14 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
 java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.ja
 va:715)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873)
at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:443)
at

you're going to need to set up Hadoop on your system enough for to execute the 
chmod operation via the winutils.exe

one tactic: grab the hortonworks windows version, install it (including setting 
up HADOOP_HOME). You don't need to run any of the hadoop services, you just 
need the binaries in the right place.

other: 

1. grab the copy of the relevant binaries which I've stuck up online

https://github.com/steveloughran/clusterconfigs/tree/master/clusters/morzine/hadoop_home/bin
2. install to some directory hadoop/bin
3. set the env variable HADOOP_HOME to the hadoopp dir (not the bin one)
4. set PATH=%PATH%;%HADOOP_HOME%/bin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Working with slides. How do I know how many times a RDD has been processed?

2015-05-19 Thread Guillermo Ortiz
I tried to insert an flag in the RDD, so I could set in the last position
a counter, when the counter gets X, I could do something. But in each slide
comes the original RDD although I modificated it.

I did this code to check if this is possible but it doesn't work.

val rdd1WithFlag = rdd1.map { register =
  var splitRegister = register._2.split(\\|)
  var newArray = new Array[String](splitRegister.length + 1)

  if (splitRegister.length == 2) {
splitRegister.copyToArray(newArray)
newArray(splitRegister.length) = 0
  } else {
splitRegister(splitRegister.length) = 1
splitRegister.copyToArray(newArray)
  }

  (splitRegister(1), newArray)
}

If I check the length of splitRegister  is always 2 in each slide, it is
never three.



2015-05-18 15:36 GMT+02:00 Guillermo Ortiz konstt2...@gmail.com:

 Hi,

 I have two streaming RDD1 and RDD2 and want to cogroup them.
 Data don't come in the same time and sometimes they could come with some
 delay.
 When I get all data I want to insert in MongoDB.

 For example, imagine that I get:
 RDD1 -- T 0
 RDD2 --T 0.5
 I do cogroup between them but I couldn't store in Mongo yet because it
 could come more data in the next windows/slide.
 RDD2' --T 1.5
 Another RDD2' comes, I only want to save in Mongo once. So, I should only
 save it when I get all data. What I know it's how long I should wait as
 much.

 Ideally, I would like to save in MongoDB in the last slide for each RDD
 when I know that there is not possible to get more RDD2 to join with RDD1.
 Is it possible? how?

 Maybe there is other way to resolve this problem, any idea?





AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Hi all,

I might be missing something, but does the new Spark 1.3 sqlContext save 
interface support using Avro as the schema structure when writing Parquet 
files, in a similar way to AvroParquetWriter (which I've got working)?

I've seen how you can load an avro file and save it as parquet from 
https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
 but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan


How to use spark to access HBase with Security enabled

2015-05-19 Thread donhoff_h
Hi, experts.

I ran the HBaseTest program which is an example from the Apache Spark source 
code to learn how to use spark to access HBase. But I met the following 
exception:
Exception in thread main 
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after 
attempts=36, exceptions:
Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: 
callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 
'hbase:meta' at region=hbase:meta,,1.1588230740, 
hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0

I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the 
above exception. I found a few entries like the following one:
2015-05-19 16:59:11,143 DEBUG 
[RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: 
RpcServer.listener,port=16020: Caught exception while reading:Authentication is 
required 

The above entry did not point to my program clearly. But the time is very near. 
Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the 
exception was caused by the Kerberos authentication.  But I am not sure.

Do anybody know if my guess is right? And if I am right, could anybody tell me 
how to set Kerberos Authentication in a spark program? I don't know how to do 
it. I already checked the API doc , but did not found any API useful. Many 
Thanks!

By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in 
the following:
***Source Code**
object HBaseTest {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName(HBaseTest)
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, args(0))

// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(args(0))) {
  val tableDesc = new HTableDescriptor(args(0))
  admin.createTable(tableDesc)
}


val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])


hBaseRDD.count()


sc.stop()
  }
}

Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Cheng Lian
That's right. Also, Spark SQL can automatically infer schema from JSON 
datasets. You don't need to specify an Avro schema:


sqlContext.jsonFile(json/path).saveAsParquetFile(parquet/path)

or with the new reader/writer API introduced in 1.4-SNAPSHOT:

   sqlContext.read.json(json/path).write.parquet(parquet/path)

Cheng

On 5/19/15 6:07 PM, Ewan Leith wrote:


Thanks Cheng, that makes sense.

So for new dataframe creation (not conversion from Avro but from JSON 
or CSV inputs) in Spark we shouldn’t worry about using Avro at all, 
just use the Spark SQL StructType when building new Dataframes? If so, 
that will be a lot simpler!


Thanks,

Ewan

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* 19 May 2015 11:01
*To:* Ewan Leith; user@spark.apache.org
*Subject:* Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext 
Save or createDataFrame Interfaces?


Hi Ewan,

Different from AvroParquetWriter, in Spark SQL we uses StructType as 
the intermediate schema format. So when converting Avro files to 
Parquet files, we internally converts Avro schema to Spark SQL 
StructType first, and then convert StructType to Parquet schema.


Cheng

On 5/19/15 4:42 PM, Ewan Leith wrote:

Hi all,

I might be missing something, but does the new Spark 1.3
sqlContext save interface support using Avro as the schema
structure when writing Parquet files, in a similar way to
AvroParquetWriter (which I’ve got working)?

I've seen how you can load an avro file and save it as parquet

fromhttps://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan





RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Thanks Cheng, that's brilliant, you've saved me a headache.

Ewan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: 19 May 2015 11:58
To: Ewan Leith; user@spark.apache.org
Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or 
createDataFrame Interfaces?

That's right. Also, Spark SQL can automatically infer schema from JSON 
datasets. You don't need to specify an Avro schema:

   sqlContext.jsonFile(json/path).saveAsParquetFile(parquet/path)

or with the new reader/writer API introduced in 1.4-SNAPSHOT:

   sqlContext.read.json(json/path).write.parquet(parquet/path)

Cheng
On 5/19/15 6:07 PM, Ewan Leith wrote:
Thanks Cheng, that makes sense.

So for new dataframe creation (not conversion from Avro but from JSON or CSV 
inputs) in Spark we shouldn't worry about using Avro at all, just use the Spark 
SQL StructType when building new Dataframes? If so, that will be a lot simpler!

Thanks,
Ewan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: 19 May 2015 11:01
To: Ewan Leith; user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or 
createDataFrame Interfaces?

Hi Ewan,

Different from AvroParquetWriter, in Spark SQL we uses StructType as the 
intermediate schema format. So when converting Avro files to Parquet files, we 
internally converts Avro schema to Spark SQL StructType first, and then convert 
StructType to Parquet schema.

Cheng
On 5/19/15 4:42 PM, Ewan Leith wrote:
Hi all,

I might be missing something, but does the new Spark 1.3 sqlContext save 
interface support using Avro as the schema structure when writing Parquet 
files, in a similar way to AvroParquetWriter (which I've got working)?

I've seen how you can load an avro file and save it as parquet from 
https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
 but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan




Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
I am using spark 1.3.1




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote:

  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/




  --
 Best Regards,
 Ayan Guha




Re: Spark SQL on large number of columns

2015-05-19 Thread Wangfei (X)
And which version are you using

发自我的 iPhone

在 2015年5月19日,18:29,ayan guha 
guha.a...@gmail.commailto:guha.a...@gmail.com 写道:

can you kindly share your code?

On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
phatak@gmail.commailto:phatak@gmail.com wrote:
Hi,
I  am trying run spark sql aggregation on a file with 26k columns. No of rows 
is very small. I am running into issue that spark is taking huge amount of time 
to parse the sql and create a logical plan. Even if i have just one row, it's 
taking more than 1 hour just to get pass the parsing. Any idea how to optimize 
in these kind of scenarios?


Regards,
Madhukara Phatak
http://datamantra.io/



--
Best Regards,
Ayan Guha


RE: Spark 1.3.1 Performance Tuning/Patterns for Data Generation Heavy/Throughput Jobs

2015-05-19 Thread Evo Eftimov
Is that a Spark or Spark Streaming application 

 

Re the map transformation which is required you can also try flatMap

 

Finally an Executor is essentially a JVM spawn by a Spark Worker Node or YARN – 
giving 60GB RAM to a single JVM will certainly result in “off the charts” GC. I 
would suggest to experiment with the following two things:

 

1.   Give less RAM to each Executor but have more Executor including more 
than one Executor per Node especially if the ratio RAM to CPU Cores is favorable

2.   Use Memory Serialized RDDs – this will store them still in RAM but in 
Java Object Serialized form and Spark uses Tachion for that purpose – a 
distributed In Memory File System – and it is Off the JVM Heap and hence avoids 
GC 

 

From: Night Wolf [mailto:nightwolf...@gmail.com] 
Sent: Tuesday, May 19, 2015 9:36 AM
To: user@spark.apache.org
Subject: Spark 1.3.1 Performance Tuning/Patterns for Data Generation 
Heavy/Throughput Jobs

 

Hi all,

 

I have a job that, for every row, creates about 20 new objects (i.e. RDD of 100 
rows in = RDD 2000 rows out). The reason for this is each row is tagged with a 
list of the 'buckets' or 'windows' it belongs to. 

 

The actual data is about 10 billion rows. Each executor has 60GB of memory.

 

Currently I have a mapPartitions task that is doing this object creation in a 
Scala Map and then returning the HashMap as an iterator via .toIterator. 

 

Is there a more efficient way to do this (assuming I can't use something like 
flatMap). 

 

The job runs (assuming each task size is small enough). But the GC time is 
understandably off the charts. 

 

I've reduced the spark cache memory percentage to 0.05 (as I just need space 
for a few broadcasts and this is a data churn task). I've left the shuffle 
memory percent unchanged. 

 

What kinds of settings should I be tuning with regards to GC? 

 

Looking at 
https://spark-summit.org/2014/wp-content/uploads/2015/03/SparkSummitEast2015-AdvDevOps-StudentSlides.pdf
 slide 125 recommends some settings but I'm not sure what would be best here). 
I tried using -XX:+UseG1GC but it pretty much causes my job to fail (all the 
executors die). Are there any tips with respect to the ratio of new gen and old 
gen space when creating lots of objects which will live in a data structure 
until the entire partition is processed? 

 

Any tips for tuning these kinds of jobs would be helpful!

 

Thanks,

~N

 



Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Cheng Lian

Hi Ewan,

Different from AvroParquetWriter, in Spark SQL we uses StructType as the 
intermediate schema format. So when converting Avro files to Parquet 
files, we internally converts Avro schema to Spark SQL StructType first, 
and then convert StructType to Parquet schema.


Cheng

On 5/19/15 4:42 PM, Ewan Leith wrote:


Hi all,

I might be missing something, but does the new Spark 1.3 sqlContext 
save interface support using Avro as the schema structure when writing 
Parquet files, in a similar way to AvroParquetWriter (which I’ve got 
working)?


I've seen how you can load an avro file and save it as parquet 
fromhttps://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html, 
but not using the 2 together.


Thanks, and apologies if I've missed something obvious!

Ewan





Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
I  am trying run spark sql aggregation on a file with 26k columns. No of
rows is very small. I am running into issue that spark is taking huge
amount of time to parse the sql and create a logical plan. Even if i have
just one row, it's taking more than 1 hour just to get pass the parsing.
Any idea how to optimize in these kind of scenarios?


Regards,
Madhukara Phatak
http://datamantra.io/


RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Thanks Cheng, that makes sense.

So for new dataframe creation (not conversion from Avro but from JSON or CSV 
inputs) in Spark we shouldn't worry about using Avro at all, just use the Spark 
SQL StructType when building new Dataframes? If so, that will be a lot simpler!

Thanks,
Ewan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: 19 May 2015 11:01
To: Ewan Leith; user@spark.apache.org
Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or 
createDataFrame Interfaces?

Hi Ewan,

Different from AvroParquetWriter, in Spark SQL we uses StructType as the 
intermediate schema format. So when converting Avro files to Parquet files, we 
internally converts Avro schema to Spark SQL StructType first, and then convert 
StructType to Parquet schema.

Cheng
On 5/19/15 4:42 PM, Ewan Leith wrote:
Hi all,

I might be missing something, but does the new Spark 1.3 sqlContext save 
interface support using Avro as the schema structure when writing Parquet 
files, in a similar way to AvroParquetWriter (which I've got working)?

I've seen how you can load an avro file and save it as parquet from 
https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
 but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan



Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
I have fields from field_0 to fied_26000. The query is select on

max( cast($columnName as double)),
   |min(cast($columnName as double)), avg(cast($columnName as double)), count(*)

for all those 26000 fields in one query.





Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 3:59 PM, ayan guha guha.a...@gmail.com wrote:

 can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


 Regards,
 Madhukara Phatak
 http://datamantra.io/




 --
 Best Regards,
 Ayan Guha



Reading Binary files in Spark program

2015-05-19 Thread Tapan Sharma
Hi Team,

I am new to Spark and learning.
I am trying to read image files into spark job. This is how I am doing:
Step 1. Created sequence files with FileName as Key and Binary image as
value. i.e.  Text and BytesWritable.
I am able to read these sequence files into Map Reduce programs.

Step 2.
I understand that Text and BytesWritable are Non Serializable therefore, I
read the sequence file in Spark as following:

SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile);
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0],
String.class, Byte.class) ;
final ListTuple2lt;String, Byte tuple2s = seqFiles.collect();

   

The moment I try to call collect() method to get the keys of sequence file,
following exception has been thrown

Can any one help me understanding why collect() method is failing? If I use
toArray() on seqFiles object then also I am getting same call stack.

Regards
Tapan



java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
retrying
2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
0.0, whose tasks have all completed, from pool 
2015-05-19 15:15:03,739 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0
2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 failed: collect at
JavaSequenceFile.java:44, took 4.421397 s
Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
result: org.apache.hadoop.io.Text
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






--
View this message in context: 

Re: Spark SQL on large number of columns

2015-05-19 Thread ayan guha
can you kindly share your code?

On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


 Regards,
 Madhukara Phatak
 http://datamantra.io/




-- 
Best Regards,
Ayan Guha


Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
An additional information is,  table is backed by a csv file which is read
using spark-csv from databricks.




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:05 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 I have fields from field_0 to fied_26000. The query is select on

 max( cast($columnName as double)),
|min(cast($columnName as double)), avg(cast($columnName as double)), 
 count(*)

 for all those 26000 fields in one query.





 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 3:59 PM, ayan guha guha.a...@gmail.com wrote:

 can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


 Regards,
 Madhukara Phatak
 http://datamantra.io/




 --
 Best Regards,
 Ayan Guha





Re: How to use spark to access HBase with Security enabled

2015-05-19 Thread Ted Yu
Which user did you run your program as ?

Have you granted proper permission on hbase side ?

You should also check master log to see if there was some clue. 

Cheers



 On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com wrote:
 
 Hi, experts.
 
 I ran the HBaseTest program which is an example from the Apache Spark 
 source code to learn how to use spark to access HBase. But I met the 
 following exception:
 Exception in thread main 
 org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after 
 attempts=36, exceptions:
 Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: 
 callTimeout=6, callDuration=68648: row 'spark_t01,,00' on 
 table 'hbase:meta' at region=hbase:meta,,1.1588230740, 
 hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0
 
 I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in 
 the above exception. I found a few entries like the following one:
 2015-05-19 16:59:11,143 DEBUG 
 [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: 
 RpcServer.listener,port=16020: Caught exception while reading:Authentication 
 is required 
 
 The above entry did not point to my program clearly. But the time is very 
 near. Since my hbase version is HBase1.0.0 and I set security enabled, I 
 doubt the exception was caused by the Kerberos authentication.  But I am not 
 sure.
 
 Do anybody know if my guess is right? And if I am right, could anybody tell 
 me how to set Kerberos Authentication in a spark program? I don't know how to 
 do it. I already checked the API doc , but did not found any API useful. Many 
 Thanks!
 
 By the way, my spark version is 1.3.0. I also paste the code of HBaseTest 
 in the following:
 ***Source Code**
 object HBaseTest {
   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(HBaseTest)
 val sc = new SparkContext(sparkConf)
 val conf = HBaseConfiguration.create()
 conf.set(TableInputFormat.INPUT_TABLE, args(0))
 
 // Initialize hBase table if necessary
 val admin = new HBaseAdmin(conf)
 if (!admin.isTableAvailable(args(0))) {
   val tableDesc = new HTableDescriptor(args(0))
   admin.createTable(tableDesc)
 }
 
 val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
 hBaseRDD.count()
 
 sc.stop()
   }
 }
 


Re: Spark Job not using all nodes in cluster

2015-05-19 Thread ayan guha
What is your spark env file says? Are you setting number of executors in
spark context?
On 20 May 2015 13:16, Shailesh Birari sbirar...@gmail.com wrote:

 Hi,

 I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB
 of RAM.
 I have around 600,000+ Json files on HDFS. Each file is small around 1KB in
 size. Total data is around 16GB. Hadoop block size is 256MB.
 My application reads these files with sc.textFile() (or sc.jsonFile()
 tried
 both) API. But all the files are getting read by only one node (4
 executors). Spark UI shows all 600K+ tasks on one node and 0 on other
 nodes.

 I confirmed that all files are accessible from all nodes. Some other
 application which uses big files uses all nodes on same cluster.

 Can you please let me know why it is behaving in such way ?

 Thanks,
   Shailesh




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark logo license

2015-05-19 Thread Matei Zaharia
Check out Apache's trademark guidelines here: 
http://www.apache.org/foundation/marks/ 
http://www.apache.org/foundation/marks/

Matei

 On May 20, 2015, at 12:02 AM, Justin Pihony justin.pih...@gmail.com wrote:
 
 What is the license on using the spark logo. Is it free to be used for
 displaying commercially?
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Spark logo license

2015-05-19 Thread Justin Pihony
Thanks!

On Wed, May 20, 2015 at 12:41 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Check out Apache's trademark guidelines here:
 http://www.apache.org/foundation/marks/

 Matei

 On May 20, 2015, at 12:02 AM, Justin Pihony justin.pih...@gmail.com
 wrote:

 What is the license on using the spark logo. Is it free to be used for
 displaying commercially?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Hive on Spark VS Spark SQL

2015-05-19 Thread guoqing0...@yahoo.com.hk
Hive on Spark and SparkSQL which should be better , and what are the key 
characteristics and the advantages and the disadvantages between ?



guoqing0...@yahoo.com.hk


Re: Spark Streaming to Kafka

2015-05-19 Thread Saisai Shao
I think here is the PR https://github.com/apache/spark/pull/2994 you could
refer to.

2015-05-20 13:41 GMT+08:00 twinkle sachdeva twinkle.sachd...@gmail.com:

 Hi,

 As Spark streaming is being nicely integrated with consuming messages from
 Kafka, so I thought of asking the forum, that is there any implementation
 available for pushing data to Kafka from Spark Streaming too?

 Any link(s) will be helpful.

 Thanks and Regards,
 Twinkle



Re: Reading Binary files in Spark program

2015-05-19 Thread Tapan Sharma
Thanks. I will try and let you know. But what exactly is an issue? Any
pointers?

Regards
Tapan

On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Try something like:

 JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir,
   org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
 IntWritable.class,
   Text.class, new Job().getConfiguration());

 With the type of input format that you require.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com
 wrote:

 Hi Team,

 I am new to Spark and learning.
 I am trying to read image files into spark job. This is how I am doing:
 Step 1. Created sequence files with FileName as Key and Binary image as
 value. i.e.  Text and BytesWritable.
 I am able to read these sequence files into Map Reduce programs.

 Step 2.
 I understand that Text and BytesWritable are Non Serializable therefore, I
 read the sequence file in Spark as following:

 SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile);
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0],
 String.class, Byte.class) ;
 final ListTuple2lt;String, Byte tuple2s = seqFiles.collect();




 The moment I try to call collect() method to get the keys of sequence
 file,
 following exception has been thrown

 Can any one help me understanding why collect() method is failing? If I
 use
 toArray() on seqFiles object then also I am getting same call stack.

 Regards
 Tapan



 java.io.NotSerializableException: org.apache.hadoop.io.Text
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
 scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
 retrying
 2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
 scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
 0.0, whose tasks have all completed, from pool
 2015-05-19 15:15:03,739 INFO
 [sparkDriver-akka.actor.default-dispatcher-2]
 scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling
 stage 0
 2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Job 0 failed: collect at
 JavaSequenceFile.java:44, took 4.421397 s
 Exception in thread main org.apache.spark.SparkException: Job aborted
 due
 to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
 result: org.apache.hadoop.io.Text
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at

 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   

Spark Streaming to Kafka

2015-05-19 Thread twinkle sachdeva
Hi,

As Spark streaming is being nicely integrated with consuming messages from
Kafka, so I thought of asking the forum, that is there any implementation
available for pushing data to Kafka from Spark Streaming too?

Any link(s) will be helpful.

Thanks and Regards,
Twinkle


sparkSQL - Hive metastore connection hangs with MS SQL server

2015-05-19 Thread jamborta
Hi all,

I am trying to setup an external metastore using Microsoft SQL on Azure, it
works ok initially but after about 5 mins inactivity it hangs, then times
out after 15 mins with this error:

15/05/20 00:02:49 ERROR ConnectionHandle: Database access problem. Killing
off this connection and all remaining connections in the connection pool.
SQL State = 08S01
15/05/20 00:02:49 ERROR RetryingHMSHandler: Retrying HMSHandler after 1000
ms (attempt 1 of 1) with error: javax.jdo.JDODataStoreException: SQL Server
did not return a 
response. The connection has been closed.
at
org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:451)
at org.datanucleus.api.jdo.JDOQuery.execute(JDOQuery.java:275)
at
org.apache.hadoop.hive.metastore.ObjectStore.getMTable(ObjectStore.java:901)
at
org.apache.hadoop.hive.metastore.ObjectStore.getTable(ObjectStore.java:833)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
NestedThrowablesStackTrace:
com.microsoft.sqlserver.jdbc.SQLServerException: SQL Server did not return a
response. The connection has been closed.
at
com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1668)
at
com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1655)
at
com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:4844)

I have also tried replacing BoneCP with DBCP in
datanucleus.connectionPoolingType, that didn't help either. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sparkSQL-Hive-metastore-connection-hangs-with-MS-SQL-server-tp22950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark users

2015-05-19 Thread Ricardo Goncalves da Silva
Hi
I'm learning spark focused on data and machine learning. Migrating from SAS.

There is a group for it? My questions are basic for now and I having very few 
answers.

Tal

Rick.



Enviado do meu smartphone Samsung Galaxy.



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição


?????? How to use spark to access HBase with Security enabled

2015-05-19 Thread donhoff_h
Sorry, this ref does not help me.  I have set up the configuration in 
hbase-site.xml. But it seems there are still some extra configurations to be 
set or APIs to be called to make my spark program be able to pass the 
authentication with the HBase.

Does anybody know how to set authentication to a secured HBase in a spark 
program which use the API newAPIHadoopRDD to get information from HBase?

Many Thanks!



--  --
??: yuzhihong;yuzhih...@gmail.com;
: 2015??5??19??(??) 9:54
??: donhoff_h165612...@qq.com; 
: useruser@spark.apache.org; 
: Re: How to use spark to access HBase with Security enabled



Please take a look at:
http://hbase.apache.org/book.html#_client_side_configuration_for_secure_operation



Cheers


On Tue, May 19, 2015 at 5:23 AM, donhoff_h 165612...@qq.com wrote:


The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my spark 
programs. I am sure I have run the kinit command to make it take effect. And I 
also used the HBase Shell to verify that this user has the right to scan and 
put the tables in HBase.


Now I still have no idea how to solve this problem. Can anybody help me to 
figure it out? Many Thanks!


--  --
??: yuzhihong;yuzhih...@gmail.com;
: 2015??5??19??(??) 7:55
??: donhoff_h165612...@qq.com; 
: useruser@spark.apache.org; 
: Re: How to use spark to access HBase with Security enabled



Which user did you run your program as ?


Have you granted proper permission on hbase side ?


You should also check master log to see if there was some clue. 


Cheers




On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com wrote:


Hi, experts.

I ran the HBaseTest program which is an example from the Apache Spark source 
code to learn how to use spark to access HBase. But I met the following 
exception:
Exception in thread main 
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after 
attempts=36, exceptions:
Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: 
callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 
'hbase:meta' at region=hbase:meta,,1.1588230740, 
hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0

I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the 
above exception. I found a few entries like the following one:
2015-05-19 16:59:11,143 DEBUG 
[RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: 
RpcServer.listener,port=16020: Caught exception while reading:Authentication is 
required 

The above entry did not point to my program clearly. But the time is very near. 
Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the 
exception was caused by the Kerberos authentication.  But I am not sure.

Do anybody know if my guess is right? And if I am right, could anybody tell me 
how to set Kerberos Authentication in a spark program? I don't know how to do 
it. I already checked the API doc , but did not found any API useful. Many 
Thanks!

By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in 
the following:
***Source Code**
object HBaseTest {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName(HBaseTest)
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, args(0))

// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(args(0))) {
  val tableDesc = new HTableDescriptor(args(0))
  admin.createTable(tableDesc)
}


val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])


hBaseRDD.count()


sc.stop()
  }
}

RE: Spark sql error while writing Parquet file- Trying to write more fields than contained in row

2015-05-19 Thread Chandra Mohan, Ananda Vel Murugan
Hi,

Thanks for the response. I was looking for a java solution. I will check the 
scala and python ones.

Regards,
Anand.C

From: Todd Nist [mailto:tsind...@gmail.com]
Sent: Tuesday, May 19, 2015 6:17 PM
To: Chandra Mohan, Ananda Vel Murugan
Cc: ayan guha; user
Subject: Re: Spark sql error while writing Parquet file- Trying to write more 
fields than contained in row

I believe your looking for  df.na.fill in scala, in pySpark Module it is fillna 
(http://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

from the docs:

df4.fillna({'age': 50, 'name': 'unknown'}).show()

age height name

10  80 Alice

5   null   Bob

50  null   Tom

50  null   unknown

On Mon, May 18, 2015 at 11:01 PM, Chandra Mohan, Ananda Vel Murugan 
ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote:
Hi,

Thanks for the response. But I could not see fillna function in DataFrame class.

[cid:image001.png@01D092DA.4DF87A00]


Is it available in some specific version of Spark sql. This is what I have in 
my pom.xml

dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-sql_2.10/artifactId
  version1.3.1/version
   /dependency

Regards,
Anand.C

From: ayan guha [mailto:guha.a...@gmail.commailto:guha.a...@gmail.com]
Sent: Monday, May 18, 2015 5:19 PM
To: Chandra Mohan, Ananda Vel Murugan; user
Subject: Re: Spark sql error while writing Parquet file- Trying to write more 
fields than contained in row

Hi

Give a try with dtaFrame.fillna function to fill up missing column

Best
Ayan

On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan 
ananda.muru...@honeywell.commailto:ananda.muru...@honeywell.com wrote:
Hi,

I am using spark-sql to read a CSV file and write it as parquet file. I am 
building the schema using the following code.

String schemaString = a b c;
   ListStructField fields = new ArrayListStructField();
   MetadataBuilder mb = new MetadataBuilder();
   mb.putBoolean(nullable, true);
   Metadata m = mb.build();
   for (String fieldName: schemaString.split( )) {
fields.add(new StructField(fieldName,DataTypes.DoubleType,true, 
m));
   }
   StructType schema = DataTypes.createStructType(fields);

Some of the rows in my input csv does not contain three columns. After building 
my JavaRDDRow, I create data frame as shown below using the RDD and schema.

DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema);

Finally I try to save it as Parquet file

darDataFrame.saveAsParquetFile(/home/anand/output.parquet”)

I get this error when saving it as Parquet file

java.lang.IndexOutOfBoundsException: Trying to write more fields than contained 
in row (3  2)

I understand the reason behind this error. Some of my rows in Row RDD does not 
contain three elements as some rows in my input csv does not contain three 
columns. But while building the schema, I am specifying every field as 
nullable. So I believe, it should not throw this error. Can anyone help me fix 
this error. Thank you.

Regards,
Anand.C





--
Best Regards,
Ayan Guha



Spark Job not using all nodes in cluster

2015-05-19 Thread Shailesh Birari
Hi,

I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB
of RAM.
I have around 600,000+ Json files on HDFS. Each file is small around 1KB in
size. Total data is around 16GB. Hadoop block size is 256MB.
My application reads these files with sc.textFile() (or sc.jsonFile()  tried
both) API. But all the files are getting read by only one node (4
executors). Spark UI shows all 600K+ tasks on one node and 0 on other nodes.

I confirmed that all files are accessible from all nodes. Some other
application which uses big files uses all nodes on same cluster.

Can you please let me know why it is behaving in such way ?

Thanks,
  Shailesh




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reading Binary files in Spark program

2015-05-19 Thread Tapan Sharma
Problem is still there.
Exception is not coming at the time of reading.
Also the count of JavaPairRDD is as expected. It is when we are calling
collect() or toArray() methods, the exception is coming.
Something to do with Text class even though I haven't used it in the
program.

Regards
Tapan

On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Try something like:

 JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir,
   org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
 IntWritable.class,
   Text.class, new Job().getConfiguration());

 With the type of input format that you require.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com
 wrote:

 Hi Team,

 I am new to Spark and learning.
 I am trying to read image files into spark job. This is how I am doing:
 Step 1. Created sequence files with FileName as Key and Binary image as
 value. i.e.  Text and BytesWritable.
 I am able to read these sequence files into Map Reduce programs.

 Step 2.
 I understand that Text and BytesWritable are Non Serializable therefore, I
 read the sequence file in Spark as following:

 SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile);
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0],
 String.class, Byte.class) ;
 final ListTuple2lt;String, Byte tuple2s = seqFiles.collect();




 The moment I try to call collect() method to get the keys of sequence
 file,
 following exception has been thrown

 Can any one help me understanding why collect() method is failing? If I
 use
 toArray() on seqFiles object then also I am getting same call stack.

 Regards
 Tapan



 java.io.NotSerializableException: org.apache.hadoop.io.Text
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at

 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
 scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
 retrying
 2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
 scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
 0.0, whose tasks have all completed, from pool
 2015-05-19 15:15:03,739 INFO
 [sparkDriver-akka.actor.default-dispatcher-2]
 scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling
 stage 0
 2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Job 0 failed: collect at
 JavaSequenceFile.java:44, took 4.421397 s
 Exception in thread main org.apache.spark.SparkException: Job aborted
 due
 to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
 result: org.apache.hadoop.io.Text
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at

 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at

 

Re: EOFException using KryoSerializer

2015-05-19 Thread Imran Rashid
Hi Jim,

this is definitley strange.  It sure sounds like a bug, but it also is a
very commonly used code path, so it at the very least you must be hitting a
corner case.  Could you share a little more info with us?  What version of
spark are you using?  How big is the object you are trying to broadcast?
Can you share more of the logs from before the exception?

It is not too surprising this shows up in mesos but not in local mode.
Local mode never exercises the part of the code that needs to deserialize
the blocks of a broadcast variables (though it actually does serialize the
data into blocks).  So I doubt its mesos specific, more likely it would
happen in any cluster mode -- yarn, standalone, or even local-cluster (a
pseudo-cluster just for testing).

Imran

On Tue, May 19, 2015 at 3:56 PM, Jim Carroll jimfcarr...@gmail.com wrote:

 I'm seeing the following exception ONLY when I run on a Mesos cluster. If I
 run the exact same code with master set to local[N] I have no problem:

  2015-05-19 16:45:43,484 [task-result-getter-0] WARN  TaskSetManager - Lost
 task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException
 at

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
 at

 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
 at

 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
 at

 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
 at

 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
 at

 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
 at

 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
 at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 KryoSerializer explicitly throws an EOFException. The comment says:

 // DeserializationStream uses the EOF exception to indicate stopping
 condition.

 Apparently this isn't what TorrentBroadcast expects.

 Any suggestions? Thanks.

 Jim





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




spark 1.3.1 jars in repo1.maven.org

2015-05-19 Thread Edward Sargisson
Hi,
I'd like to confirm an observation I've just made. Specifically that spark
is only available in repo1.maven.org for one Hadoop variant.

The Spark source can be compiled against a number of different Hadoops
using profiles. Yay.
However, the spark jars in repo1.maven.org appear to be compiled against
one specific Hadoop and no other differentiation is made. (I can see a
difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in
the version I compiled locally).

The implication here is that if you have a pom file asking for
spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
version. Maven assumes that non-snapshot artifacts never change so trying
to load an Hadoop 1 version will end in tears.

This then means that if you compile code against spark-core then there will
probably be classpath NoClassDefFound issues unless the Hadoop 2 version is
exactly the one you want.

Have I gotten this correct?

It happens that our little app is using a Spark context directly from a
Jetty webapp and the classpath differences were/are causing some confusion.
We are currently installing a Hadoop 1 spark master and worker.

Thanks a lot!
Edward


Re: spark 1.3.1 jars in repo1.maven.org

2015-05-19 Thread Ted Yu
I think your observation is correct.
e.g.
http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.3.1
shows that it depends on hadoop-client
http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client from
hadoop 2.2

Cheers

On Tue, May 19, 2015 at 6:17 PM, Edward Sargisson esa...@pobox.com wrote:

 Hi,
 I'd like to confirm an observation I've just made. Specifically that spark
 is only available in repo1.maven.org for one Hadoop variant.

 The Spark source can be compiled against a number of different Hadoops
 using profiles. Yay.
 However, the spark jars in repo1.maven.org appear to be compiled against
 one specific Hadoop and no other differentiation is made. (I can see a
 difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in
 the version I compiled locally).

 The implication here is that if you have a pom file asking for
 spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
 version. Maven assumes that non-snapshot artifacts never change so trying
 to load an Hadoop 1 version will end in tears.

 This then means that if you compile code against spark-core then there
 will probably be classpath NoClassDefFound issues unless the Hadoop 2
 version is exactly the one you want.

 Have I gotten this correct?

 It happens that our little app is using a Spark context directly from a
 Jetty webapp and the classpath differences were/are causing some confusion.
 We are currently installing a Hadoop 1 spark master and worker.

 Thanks a lot!
 Edward



Spark logo license

2015-05-19 Thread Justin Pihony
What is the license on using the spark logo. Is it free to be used for
displaying commercially?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Find KNN in Spark SQL

2015-05-19 Thread Debasish Das
The batch version of this is part of rowSimilarities JIRA 4823 ...if your
query points can fit in memory there is broadcast version which we are
experimenting with internallywe are using brute force KNN right now in
the PR...based on flann paper lsh did not work well but before you go to
approximate knn you have to make sure your topk precision/recall is not
degrading as compared to brute force in your cv flow...

I have not yet extracted knn model but that will use the IndexedRowMatrix
changes that we put in the PR
On May 19, 2015 12:58 PM, Xiangrui Meng men...@gmail.com wrote:

 Spark SQL doesn't provide spatial features. Large-scale KNN is usually
 combined with locality-sensitive hashing (LSH). This Spark package may
 be helpful: http://spark-packages.org/package/mrsqueeze/spark-hash.
 -Xiangrui

 On Sat, May 9, 2015 at 9:25 PM, Dong Li lid...@lidong.net.cn wrote:
  Hello experts,
 
  I’m new to Spark, and want to find K nearest neighbors on huge scale
 high-dimension points dataset in very short time.
 
  The scenario is: the dataset contains more than 10 million points, whose
 dimension is 200d. I’m building a web service, to receive one new point at
 each request and return K nearest points inside that dataset, also need to
 ensure the time-cost not very high. I have a cluster with several
 high-memory nodes for this service.
 
  Currently I only have these ideas here:
  1. To create several ball-tree instances in each node when service
 initializing. This is fast, but not perform well at data scaling ability. I
 cannot insert new nodes to the ball-trees unless I restart the services and
 rebuild them.
  2. To use sql based solution. Some database like PostgreSQL and
 SqlServer have features on spatial search. But these database may not
 perform well in big data environment. (Does SparkSQL have Spatial features
 or spatial index?)
 
  Based on your experience, can I achieve this scenario in Spark SQL? Or
 do you know other projects in Spark stack acting well for this?
  Any ideas are appreciated, thanks very much.
 
  Regards,
  Dong
 
 
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Tested for calculating values for 300 columns. Analyser takes around 4
minutes to generate the plan. Is this normal?




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 I am using spark 1.3.1




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote:

  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No of
 rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/




  --
 Best Regards,
 Ayan Guha





Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row

2015-05-19 Thread Todd Nist
I believe your looking for  df.na.fill in scala, in pySpark Module it is
fillna (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

from the docs:

df4.fillna({'age': 50, 'name': 'unknown'}).show()age height name10  80
Alice5   null   Bob50  null   Tom50  null   unknown


On Mon, May 18, 2015 at 11:01 PM, Chandra Mohan, Ananda Vel Murugan 
ananda.muru...@honeywell.com wrote:

  Hi,



 Thanks for the response. But I could not see fillna function in DataFrame
 class.







 Is it available in some specific version of Spark sql. This is what I have
 in my pom.xml



 dependency

   groupIdorg.apache.spark/groupId

   artifactIdspark-sql_2.10/artifactId

   version1.3.1/version

/dependency



 Regards,

 Anand.C



 *From:* ayan guha [mailto:guha.a...@gmail.com]
 *Sent:* Monday, May 18, 2015 5:19 PM
 *To:* Chandra Mohan, Ananda Vel Murugan; user
 *Subject:* Re: Spark sql error while writing Parquet file- Trying to
 write more fields than contained in row



 Hi



 Give a try with dtaFrame.fillna function to fill up missing column



 Best

 Ayan



 On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan 
 ananda.muru...@honeywell.com wrote:

 Hi,



 I am using spark-sql to read a CSV file and write it as parquet file. I am
 building the schema using the following code.



 String schemaString = a b c;

ListStructField fields = *new* ArrayListStructField();

MetadataBuilder mb = *new* MetadataBuilder();

mb.putBoolean(nullable, *true*);

Metadata m = mb.build();

*for* (String fieldName: schemaString.split( )) {

 fields.add(*new* StructField(fieldName,DataTypes.
 *DoubleType*,*true*, m));

}

StructType schema = DataTypes.*createStructType*(fields);



 Some of the rows in my input csv does not contain three columns. After
 building my JavaRDDRow, I create data frame as shown below using the
 RDD and schema.



 DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema);



 Finally I try to save it as Parquet file



 darDataFrame.saveAsParquetFile(/home/anand/output.parquet”)



 I get this error when saving it as Parquet file



 java.lang.IndexOutOfBoundsException: Trying to write more fields than
 contained in row (3  2)



 I understand the reason behind this error. Some of my rows in Row RDD does
 not contain three elements as some rows in my input csv does not contain
 three columns. But while building the schema, I am specifying every field
 as nullable. So I believe, it should not throw this error. Can anyone help
 me fix this error. Thank you.



 Regards,

 Anand.C









 --

 Best Regards,
 Ayan Guha



Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread Imran Rashid
hmm, I guess it depends on the way you look at it.  In a way, I'm saying
that spark does *not* have any built in auto-re-broadcast if you try to
mutate a broadcast variable.  Instead, you should create something new, and
just broadcast it separately.  Then just have all the code you have
operating on your RDDs look at the new broadcast variable.

But I guess there is another way to look at it -- you are creating new
broadcast variables each time, but they all point to the same underlying
mutable data structure.  So in a way, you are rebroadcasting the same
underlying data structure.

Let me expand my example from earlier a little bit more:


def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
 ...
}

// this is a val, because the data structure itself is mutable
val myMutableDataStructue = ...
// this is a var, because you will create new broadcasts
var myBroadcast = sc.broadcast(myMutableDataStructure)
(0 to 20).foreach { iteration =
  oneIteration(myRDD, myBroadcast)
  // update your mutable data structure in place
  myMutableDataStructure.update(...)
  // ... but that doesn't effect the broadcast variables living out on the
cluster, so we need to
  // create a new one

  // this line is not required -- the broadcast var will automatically get
unpersisted when a gc
  // cleans up the old broadcast on the driver, but I'm including this here
for completeness,
  // in case you want to more proactively clean up old blocks if you are
low on space
  myBroadcast.unpersist()

  // now we create a new broadcast which has the updated data in our
mutable data structure
  myBroadcast = sc.broadcast(myMutableDataStructure)
}


hope this clarifies things!

Imran

On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote:

 Hi Imran,

 If I understood you correctly, you are suggesting to simply call broadcast
 again from the driver program. This is exactly what I am hoping will work
 as I have the Broadcast data wrapped up and I am indeed (re)broadcasting
 the wrapper over again when the underlying data changes. However,
 documentation seems to suggest that one cannot re-broadcast. Is my
 understanding accurate?

 Thanks
 NB


 On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Rather than updating the broadcast variable, can't you simply create a
 new one?  When the old one can be gc'ed in your program, it will also get
 gc'ed from spark's cache (and all executors).

 I think this will make your code *slightly* more complicated, as you need
 to add in another layer of indirection for which broadcast variable to use,
 but not too bad.  Eg., from

 var myBroadcast = sc.broadcast( ...)
 (0 to 20).foreach{ iteration =
   //  ... some rdd operations that involve myBroadcast ...
   myBroadcast.update(...) // wrong! dont' update a broadcast variable
 }

 instead do something like:

 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit =
 {
  ...
 }

 var myBroadcast = sc.broadcast(...)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here,
 with whatever you need to update it
 }

 On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is
 broadcast in order to ensure that all nodes get the same value of the
 broadcast variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the underlying
 data is updated in order to get the changes visible on all nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. If the underlying data
 changes then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using sparkContext.broadcast(),
 can it
 ever be updated again? The use case is for something like the
 underlying
 lookup data changing over time.

 Thanks
 NB




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-can-be-rebroadcast-tp22908.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, 

?????? How to use spark to access HBase with Security enabled

2015-05-19 Thread donhoff_h
The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my spark 
programs. I am sure I have run the kinit command to make it take effect. And I 
also used the HBase Shell to verify that this user has the right to scan and 
put the tables in HBase.


Now I still have no idea how to solve this problem. Can anybody help me to 
figure it out? Many Thanks!


--  --
??: yuzhihong;yuzhih...@gmail.com;
: 2015??5??19??(??) 7:55
??: donhoff_h165612...@qq.com; 
: useruser@spark.apache.org; 
: Re: How to use spark to access HBase with Security enabled



Which user did you run your program as ?


Have you granted proper permission on hbase side ?


You should also check master log to see if there was some clue. 


Cheers




On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com wrote:


Hi, experts.

I ran the HBaseTest program which is an example from the Apache Spark source 
code to learn how to use spark to access HBase. But I met the following 
exception:
Exception in thread main 
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after 
attempts=36, exceptions:
Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: 
callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 
'hbase:meta' at region=hbase:meta,,1.1588230740, 
hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0

I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the 
above exception. I found a few entries like the following one:
2015-05-19 16:59:11,143 DEBUG 
[RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: 
RpcServer.listener,port=16020: Caught exception while reading:Authentication is 
required 

The above entry did not point to my program clearly. But the time is very near. 
Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the 
exception was caused by the Kerberos authentication.  But I am not sure.

Do anybody know if my guess is right? And if I am right, could anybody tell me 
how to set Kerberos Authentication in a spark program? I don't know how to do 
it. I already checked the API doc , but did not found any API useful. Many 
Thanks!

By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in 
the following:
***Source Code**
object HBaseTest {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName(HBaseTest)
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, args(0))

// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(args(0))) {
  val tableDesc = new HTableDescriptor(args(0))
  admin.createTable(tableDesc)
}


val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])


hBaseRDD.count()


sc.stop()
  }
}

RE: Decision tree: categorical variables

2015-05-19 Thread Keerthi
Hi ,

can you pls share how you resolved the parsing issue. It would be of great
help...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-categorical-variables-tp12433p22943.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Another update, when run on more that 1000 columns I am getting

Could not write class
__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d/__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d$$anonfun$wrapper$1$$anon$1
because it exceeds JVM code size limits. Method apply's code too large!






Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 6:23 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 Tested with HiveContext also. It also take similar amount of time.

 To make the things clear, the following is select clause for a given column


 *aggregateStats( $columnName , max( cast($columnName as double)),   
 |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) 
 )*

 aggregateStats is UDF generating case class to hold the values.








 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 5:57 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 Tested for calculating values for 300 columns. Analyser takes around 4
 minutes to generate the plan. Is this normal?




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I am using spark 1.3.1




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com
 wrote:

  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No
 of rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/




  --
 Best Regards,
 Ayan Guha







Re: Reading Real Time Data only from Kafka

2015-05-19 Thread Akhil Das
Cool. Thanks for the detailed response Cody.

Thanks
Best Regards

On Tue, May 19, 2015 at 6:43 PM, Cody Koeninger c...@koeninger.org wrote:

 If those questions aren't answered by

 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

 please let me know so I can update it.

 If you set auto.offset.reset to largest, it will start at the largest
 offset.  Any messages before that will be skipped, so if prior runs of the
 job didn't consume them, they're lost.

 KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from
 a locality hint if you have kafka running on the same node as spark), and
 it doesn't have any long-running receivers.  Executors get whatever
 partitions the normal scheduler decides they should get.  If an executor
 fails, a different executor reads the offset range for the failed
 partition; they're immutable, so no difference in result.

 Deciding where to save offsets (or not) is up to you.  You can checkpoint,
 or store them yourself.

 On Mon, May 18, 2015 at 12:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 I have played a bit with the directStream kafka api. Good work cody.
 These are my findings and also can you clarify a few things for me (see
 below).

 - When auto.offset.reset- smallest and you have 60GB of messages in
 Kafka, it takes forever as it reads the whole 60GB at once. largest will
 only read the latest messages.
 - To avoid this, you can actually limit the rate with
 spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
 reads the same amount of data).
 - Number of partitions per batch = number of kafka partitions.

 - In the case of driver failures, offset reset being set to smallest
 will replay the whole messages and largest will only read those messages
 which are pushed after the streaming job has started. What happens to those
 messages which arrive in between?

 *Few things which are unclear:*

 - If we have a kafka topic with 9 partitions, and spark cluster with 3
 slaves, how does it decides which slave should read from which partition?
 And what happens if a single slave fails while reading the data?

 - By default it doesn't push the offsets of messages which are read
 anywhere, then how does it replay the message in case of failures?

 Thanks
 Best Regards

 On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You linked to a google mail tab, not a public archive, so I don't know
 exactly which conversation you're referring to.

 As far as I know, streaming only runs a single job at a time in the
 order they were defined, unless you turn on an experimental option for more
 parallelism (TD or someone more knowledgeable can chime in on this).  If
 you're talking about the possibility of the next job starting before the
 prior one has fully finished, because your processing is lagging behind...
 I'm not 100% sure this is possible because I've never observed it.

 The thing is, it's a moot point, because if you're saving offsets
 yourself transactionally, you already need to be verifying that offsets are
 correct (increasing without gaps) in order to handle restarts correctly.

 If you're super concerned about how batches get generated, the direct
 api gives you access to KafkaUtils.createRDD... just schedule your own rdds
 in the order you want.  Again, flexible.




 On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thanks Cody for your email. I think my concern was not to get the
 ordering of message within a partition , which as you said is possible if
 one knows how Spark works. The issue is how Spark schedule jobs on every
 batch  which is not on the same order they generated. So if that is not
 guaranteed it does not matter if you manege order within your partition. So
 depends on par-partition ordering to commit offset may leads to offsets
 commit in wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are
 inherently cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same
 ordering guarantee as Kafka, namely that within a 

Re: Spark and Flink

2015-05-19 Thread Till Rohrmann
I guess it's a typo: eu.stratosphere should be replaced by
org.apache.flink

On Tue, May 19, 2015 at 1:13 PM, Alexander Alexandrov 
alexander.s.alexand...@gmail.com wrote:

 We managed to do this with the following config:

 // properties
 !-- Hadoop --
 hadoop.version2.2.0/hadoop.version
 !-- Flink --
 flink.version0.9-SNAPSHOT/flink.version
 !-- Spark --
 spark.version1.2.1/spark.version

 // form the dependency management
 !-- Hadoop --
 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-common/artifactId
 version${hadoop.version}/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdorg.apache.hadoop/groupId
 artifactIdhadoop-hdfs/artifactId
 version${hadoop.version}/version
 scopeprovided/scope
 /dependency

 !-- Flink --
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-scala/artifactId
 version${flink.version}/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-java/artifactId
 version${flink.version}/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-clients/artifactId
 version${flink.version}/version
 scopeprovided/scope
 /dependency

 !-- Spark --
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_${scala.tools.version}/artifactId
 version${spark.version}/version
 scopeprovided/scope
 /dependency

 !-- Jetty --
 dependency
 groupIdorg.eclipse.jetty/groupId
 artifactIdjetty-util/artifactId
 version${jetty.version}/version
 /dependency
 dependency
 groupIdorg.eclipse.jetty/groupId
 artifactIdjetty-servlet/artifactId
 version${jetty.version}/version
 /dependency

 // actual dependencies
 !-- Spark --
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-core_${scala.tools.version}/artifactId
 /dependency

 !-- Flink --
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-scala/artifactId
 /dependency
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-java/artifactId
 /dependency
 dependency
 groupIdeu.stratosphere/groupId
 artifactIdflink-clients/artifactId
 /dependency
 !-- FIXME: this is a hacky solution for a Flink issue with the
 Jackson deps--
 dependency
 groupIdcom.fasterxml.jackson.core/groupId
 artifactIdjackson-core/artifactId
 version2.2.1/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdcom.fasterxml.jackson.core/groupId
 artifactIdjackson-databind/artifactId
 version2.2.1/version
 scopeprovided/scope
 /dependency
 dependency
 groupIdcom.fasterxml.jackson.core/groupId
 artifactIdjackson-annotations/artifactId
 version2.2.1/version
 scopeprovided/scope
 /dependency


 2015-05-19 10:06 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 it's sound good, maybe you can send me pseudo structure, that is my fist
 maven project.

 best regards,
 paul

 2015-05-18 14:05 GMT+02:00 Robert Metzger rmetz...@apache.org:

 Hi,
 I would really recommend you to put your Flink and Spark dependencies
 into different maven modules.
 Having them both in the same project will be very hard, if not
 impossible.
 Both projects depend on similar projects with slightly different
 versions.

 I would suggest a maven module structure like this:
 yourproject-parent (a pom module)
 -- yourproject-common
 -- yourproject-flink
 -- yourproject-spark



 On Mon, May 18, 2015 at 10:00 AM, Pa Rö paul.roewer1...@googlemail.com
 wrote:

 hi,
 if i add your dependency i get over 100 errors, now i change the
 version number:
 dependencies
 dependency
 groupIdcom.fasterxml.jackson.module/groupId
 artifactIdjackson-module-scala_2.10/artifactId
 version2.4.4/version
 exclusions
 exclusion
 groupIdcom.google.guava/groupId
 artifactIdguava/artifactId
 /exclusion
 

Hive in IntelliJ

2015-05-19 Thread Heisenberg Bb
I was trying to implement this example:
http://spark.apache.org/docs/1.3.1/sql-programming-guide.html#hive-tables

It worked well when I built spark in terminal using command specified:
http://spark.apache.org/docs/1.3.1/building-spark.html#building-with-hive-and-jdbc-support

But when I try to implement in IntelliJ, following the specifications
specified:
https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IntelliJ

It throws the error:
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
console:21: error: object hive is not a member of package
org.apache.spark.sql

Can any one help me get through this issue.

Regards
Akhil


Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Tested with HiveContext also. It also take similar amount of time.

To make the things clear, the following is select clause for a given column


*aggregateStats( $columnName , max( cast($columnName as double)),
|min(cast($columnName as double)), avg(cast($columnName as double)),
count(*) )*

aggregateStats is UDF generating case class to hold the values.








Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 5:57 PM, madhu phatak phatak@gmail.com wrote:

 Hi,
 Tested for calculating values for 300 columns. Analyser takes around 4
 minutes to generate the plan. Is this normal?




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:35 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I am using spark 1.3.1




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) wangf...@huawei.com wrote:

  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,ayan guha guha.a...@gmail.com 写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak phatak@gmail.com
 wrote:

 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No
 of rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/




  --
 Best Regards,
 Ayan Guha






Re: Reading Real Time Data only from Kafka

2015-05-19 Thread Cody Koeninger
If those questions aren't answered by

https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

please let me know so I can update it.

If you set auto.offset.reset to largest, it will start at the largest
offset.  Any messages before that will be skipped, so if prior runs of the
job didn't consume them, they're lost.

KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from a
locality hint if you have kafka running on the same node as spark), and it
doesn't have any long-running receivers.  Executors get whatever partitions
the normal scheduler decides they should get.  If an executor fails, a
different executor reads the offset range for the failed partition; they're
immutable, so no difference in result.

Deciding where to save offsets (or not) is up to you.  You can checkpoint,
or store them yourself.

On Mon, May 18, 2015 at 12:00 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I have played a bit with the directStream kafka api. Good work cody. These
 are my findings and also can you clarify a few things for me (see below).

 - When auto.offset.reset- smallest and you have 60GB of messages in
 Kafka, it takes forever as it reads the whole 60GB at once. largest will
 only read the latest messages.
 - To avoid this, you can actually limit the rate with
 spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
 reads the same amount of data).
 - Number of partitions per batch = number of kafka partitions.

 - In the case of driver failures, offset reset being set to smallest
 will replay the whole messages and largest will only read those messages
 which are pushed after the streaming job has started. What happens to those
 messages which arrive in between?

 *Few things which are unclear:*

 - If we have a kafka topic with 9 partitions, and spark cluster with 3
 slaves, how does it decides which slave should read from which partition?
 And what happens if a single slave fails while reading the data?

 - By default it doesn't push the offsets of messages which are read
 anywhere, then how does it replay the message in case of failures?

 Thanks
 Best Regards

 On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger c...@koeninger.org
 wrote:

 You linked to a google mail tab, not a public archive, so I don't know
 exactly which conversation you're referring to.

 As far as I know, streaming only runs a single job at a time in the order
 they were defined, unless you turn on an experimental option for more
 parallelism (TD or someone more knowledgeable can chime in on this).  If
 you're talking about the possibility of the next job starting before the
 prior one has fully finished, because your processing is lagging behind...
 I'm not 100% sure this is possible because I've never observed it.

 The thing is, it's a moot point, because if you're saving offsets
 yourself transactionally, you already need to be verifying that offsets are
 correct (increasing without gaps) in order to handle restarts correctly.

 If you're super concerned about how batches get generated, the direct api
 gives you access to KafkaUtils.createRDD... just schedule your own rdds in
 the order you want.  Again, flexible.




 On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thanks Cody for your email. I think my concern was not to get the
 ordering of message within a partition , which as you said is possible if
 one knows how Spark works. The issue is how Spark schedule jobs on every
 batch  which is not on the same order they generated. So if that is not
 guaranteed it does not matter if you manege order within your partition. So
 depends on par-partition ordering to commit offset may leads to offsets
 commit in wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger c...@koeninger.org
 wrote:

 As far as I can tell, Dibyendu's cons boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are
 inherently cons of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that within a given partition messages will be
 in increasing offset order.   Clearly if you do a transformation that
 repartitions the stream, that no longer holds.  Thing 

Re: spark streaming doubt

2015-05-19 Thread Shushant Arora
So for Kafka+spark streaming, Receiver based streaming used highlevel api
and non receiver based streaming used low level api.

1.In high level receiver based streaming does it registers consumers at
each job start(whenever a new job is launched by streaming application say
at each second)?
2.No of executors in highlevel receiver based jobs will always equal to no
of partitions in topic ?
3.Will data from a single topic be consumed by executors in parllel or only
one receiver consumes in multiple threads and assign to executors in high
level receiver based approach ?




On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead to
 weird sharing of resources and which can make it hard to debug the whether
 there is sufficient resources in the system to process the ingested data
 fast enough. With only 1 job running at a time, it is easy to see that if
 batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the fileStream,
 directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet finished
 and stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but
 my job takes 2 min to process 1 sec stream , what will happen ?  At any
 time there will be only one job running or multiple ?







Re: PySpark Job throwing IOError

2015-05-19 Thread Muralidhar, Nikhil
Hello all,
  I have an error in pyspark for which I have not the faintest idea of the 
cause. All I can tell from the stack trace is that it can't find a pyspark file 
on the path /mnt/spark-*/pyspark-*. Apart from that I need someone more  
experienced than me with Spark to look into it and help diagnose the problem 
and suggest potential solutions, hence I am looking to this group for help.


If anyone wants to read the same question on Stack Overflow here is the link:  
http://stackoverflow.com/questions/30328104/pyspark-job-throwing-ioerror

Here's the same thing pasted as raw text:


I am trying to write a simple KNN job using pyspark on a hdfs cluster. I am 
using very few input files to perform the job so I don't think it's a memory 
(space). I do not do a broadcast in any part of my code. So it is surprising to 
me when the broadcast.py fails? I however do have python dictionaries that I 
have in shared memory without explicitly doing a broadcast.

Can anyone help me understand what is going on?

I have appended my python file and the stack trace to this email.


Thanks,

Nikhil

from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
import subprocess
from itertools import combinations
We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts.
def create_indices(inputdir):
items=dict()
user_id_to_idx=dict()
user_idx_to_id=dict()
item_idx_to_id=dict()
item_id_to_idx=dict()
item_idx=0
user_idx=0

cat=subprocess.Popen([hadoop,fs,-cat,/user/hadoop/+inputdir+/*.txt],stdout=subprocess.PIPE)
for line in cat.stdout:
toks=map(str,line.strip().split(\t))
try:
user_id_to_idx[toks[1].strip()]
except KeyError:
if toks[1].strip()!=None:
user_id_to_idx[toks[1].strip()]=user_idx
user_idx_to_id[user_idx]=toks[1].strip()
user_idx+=1
try:
item_id_to_idx[toks[0].strip()]
except KeyError:
if toks[0].strip()!=None:
item_id_to_idx[toks[0].strip()]=item_idx
item_idx_to_id[item_idx]=toks[0].strip()
item_idx+=1
return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx

def concat_helper(a,b):
if(a!= None and b!=None):
print a,b,a.update(b)
temp=dict()
temp.update(a)
temp.update(b)
return temp
elif a!=None:
return a
elif b!=None:
return b

# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
rdd_text=sc.textFile(inputdir)
try:
new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split(\t)[0])],{user_id_to_idx[str(x.strip().split(\t)[1])]:1})).reduceByKey(lambda x,y: concat_helper(x,y)).sortByKey()
except KeyError:
print item_id_to_idx.keys()
pass
return new_rdd

if __name__==__main__:
sc = SparkContext()
u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])

u_idx_to_id_b=sc.broadcast(u_idx_to_id)
u_id_to_idx_b=sc.broadcast(u_id_to_idx)
i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
i_id_to_idx_b=sc.broadcast(i_id_to_idx)
num_users=sc.broadcast(u_idx)
num_items=sc.broadcast(i_idx)
item_dict_rdd=runKNN(sys.argv[1],sc,u_id_to_idx,i_id_to_idx)

item_dict_rdd_new=item_dict_rdd.map(lambda x: (x[0],SparseVector(i_idx,x[1])))
item_dict_rdd_new.saveAsTextFile(hdfs://output_path)
#dot_products_rdd=map(lambda (x,y): (x,y),combinations(item_dict_rdd_new.map(lambda x: x),2))
dot_products_rdd.saveAsTextFile(hdfs://output_path_2)

stacktrace
Description: stacktrace

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: spark streaming doubt

2015-05-19 Thread Akhil Das
On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel api
 and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?


​- Receiver based streaming will always have the receiver running parallel
while your job is running, So by default for every 200ms
(spark.streaming.blockInterval) the receiver will generate a block of data
which is read from Kafka.
​


 2.No of executors in highlevel receiver based jobs will always equal to no
 of partitions in topic ?


​- Not sure from where did you came up with this. For the non stream based
one, i think the number of partitions in spark will be equal to the number
of kafka partitions for the given topic.
​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based approach,
you can actually specify the number of receiver that you want to spawn for
consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the ingested
 data fast enough. With only 1 job running at a time, it is easy to see that
 if batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the fileStream,
 directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet finished
 and stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but
 my job takes 2 min to process 1 sec stream , what will happen ?  At any
 time there will be only one job running or multiple ?








Multi user setup and saving a DataFrame / RDD to a network exported file system

2015-05-19 Thread Tomasz Fruboes

Dear Experts,

 we have a spark cluster (standalone mode) in which master and workers 
are started from root account. Everything runs correctly to the point 
when we try doing operations such as


dataFrame.select(name, age).save(ofile, parquet)

or

rdd.saveAsPickleFile(ofile)

, where ofile is path on a network exported filesystem (visible on all 
nodes, in our case this is lustre, I guess on nfs effect would be similar).


 Unsurprisingly temp files created on workers are owned by root, which 
then leads to a crash (see [1] below). Is there a solution/workaround 
for this (e.g. controlling file creation mode of the temporary files)?


Cheers,
 Tomasz


ps I've tried to google this problem, couple of similar reports, but no 
clear answer/solution found


ps2 For completeness - running master/workers as a regular user solves 
the problem only for the given user. For other users submitting to this 
master the result is given in [2] below



[0] Cluster details:
Master/workers: centos 6.5
Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build)


[1]
##
   File 
/mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, 
line 300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o27.save.
: java.io.IOException: Failed to rename 
DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet; 
isDirectory=false; length=534; replication=1; blocksize=33554432; 
modification_time=1432042832000; access_time=0; owner=; group=; 
permission=rw-rw-rw-; isSymlink=false} to 
file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at 
parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43)
at 
org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690)
at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129)
at 
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)

at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)
at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
##



[2]
##
15/05/19 14:45:19 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 
3, wn23023.cis.gov.pl): java.io.IOException: Mkdirs failed to create 
file:/mnt/lustre/bigdata/med_home/tmp/test18/namesAndAges.parquet2/_temporary/0/_temporary/attempt_201505191445_0009_r_00_0
	at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
	at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at parquet.hadoop.ParquetFileWriter.init(ParquetFileWriter.java:154)
	at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:279)
	at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
	at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:667)
	at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
	at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 

Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Matei Zaharia
Hey Tom,

Are you using the fine-grained or coarse-grained scheduler? For the 
coarse-grained scheduler, there is a spark.cores.max config setting that will 
limit the total # of cores it grabs. This was there in earlier versions too.

Matei

 On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote:
 
 I read the other day that there will be a fair number of improvements in 1.4 
 for Mesos. Could I ask for one more (if it isn't already in there): a 
 configurable limit for the number of tasks for jobs run on Mesos ? This would 
 be a very simple yet effective way to prevent a job dominating the cluster.
 
 cheers,
 Tom
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PanTera Big Data Visualization built with Spark

2015-05-19 Thread Cyrus Handy
Hi,

Can you please add us to the list of Spark Users

Org: PanTera
URL: http://pantera.io
Components we are using:

   - PanTera uses a direct access to the Spark Scala API
   - Spark Core ­ SparkContext, JavaSparkContext, SparkConf, RDD, JavaRDD,
   - Accumulable, AccumulableParam, Accumulator, AccumulatorParam,
   - StorageLevel, Broadcast, HashPartitioner, Logging, KryoRegistrator,
   - NewHadoopRDD, UnionRDD
   - We also use SharedSparkContext for testing, but we've made a copy
   - SparkSQL ­ SQLContext, JavaSQLContext, ByteType, IntType, LongType,
   - FloatType, DoubleType, BooleanType, StringType, ArrayType,
   - TimestampType, StructField, StructType, Row, GenericRow
   - Spark Streaming ­ We have done so in the past, and will again in the
   future,
   - but not currently supported (when used: StreamingContext, DStream,
   Time)
   - GraphX ­ Graph, PartitionStrategy, Edge, EdgeTriplet, VertexRDD,
   - graphToGraphOps

Use Case:

PanTera is a tool for exploring large datasets. It uses Spark to create XY
and geographic scatterplots from millions to billions of datapoints.

Please let me know if you have any questions.

Thanks very much,

Cyrus Handy
PanTera http://pantera.io Product Manager
Uncharted™ http://uncharted.software (Formerly Oculus Info Inc.)
Direct: 416-203-3003 x232
Mobile: 416-821-3025


Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Thomas Dudziak
I'm using fine-grained for a multi-tenant environment which is why I would
welcome the limit of tasks per job :)

cheers,
Tom

On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Hey Tom,

 Are you using the fine-grained or coarse-grained scheduler? For the
 coarse-grained scheduler, there is a spark.cores.max config setting that
 will limit the total # of cores it grabs. This was there in earlier
 versions too.

 Matei

  On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote:
 
  I read the other day that there will be a fair number of improvements in
 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a
 configurable limit for the number of tasks for jobs run on Mesos ? This
 would be a very simple yet effective way to prevent a job dominating the
 cluster.
 
  cheers,
  Tom
 




Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Thomas Dudziak
I read the other day that there will be a fair number of improvements in
1.4 for Mesos. Could I ask for one more (if it isn't already in there): a
configurable limit for the number of tasks for jobs run on Mesos ? This
would be a very simple yet effective way to prevent a job dominating the
cluster.

cheers,
Tom


Code error

2015-05-19 Thread Ricardo Goncalves da Silva
Hi,

Can anybody see what's wrong in this piece of code:


./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors


val data = sc.textFile(/user/p_loadbd/fraude5.csv).map(x = 
x.toLowerCase.split(',')).map(x = x(0)+,+x(1))
val header = data.first()
val filter_data = data.filter(x = x != header)
val parsedData = data.map(s = 
Vectors.dense(s.split(',').map(_.toDouble))).cache()

val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

val WSSSE = clusters.computeCost(parsedData)
println(Within Set Sum of Squared Errors =  + WSSSE)

Thanks.


[Descrição: Descrição: Descrição: cid:image002.jpg@01CC89A8.2B628650]

Ricardo Goncalves da Silva
Lead Data Scientist | Seção de Desenvolvimento de Sistemas de
Business Intelligence - Projetos de Inovação | IDPB02
Av. Eng. Luis Carlos Berrini, 1.376 - 7º - 04571-000 - SP
ricardog.si...@telefonica.commailto:ricardog.si...@telefonica.com | 
www.telefonica.com.brhttp://www.telefonica.com.br/
Tel +55 11 3430 4955 | Cel +55 11 94292 9526






Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição


Re: spark streaming doubt

2015-05-19 Thread Dibyendu Bhattacharya
Just to add, there is a Receiver based Kafka consumer which uses Kafka Low
Level Consumer API.

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


Regards,
Dibyendu

On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:


 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel api
 and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?


 ​- Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


 2.No of executors in highlevel receiver based jobs will always equal to
 no of partitions in topic ?


 ​- Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the ingested
 data fast enough. With only 1 job running at a time, it is easy to see that
 if batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the fileStream,
 directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 What happnes if in a streaming application one job is not yet
 finished and stream interval reaches. Does it starts next job or wait for
 first to finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but
 my job takes 2 min to process 1 sec stream , what will happen ?  At any
 time there will be only one job running or 

Re: Decision tree: categorical variables

2015-05-19 Thread Ram Sriharsha
Hi Keerthi

As Xiangrui mentioned in the reply, the categorical variables are assumed
to be encoded as integers between 0 and k - 1, if k is the parameter you
are passing as the category info map. So you will need to handle this
during parsing (your columns 3 and 6 need to be converted into ints in the
right range)

Ram

On Tue, May 19, 2015 at 5:45 AM, Keerthi keerthi.reddy1...@gmail.com
wrote:

 Hi ,

 can you pls share how you resolved the parsing issue. It would be of great
 help...

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-categorical-variables-tp12433p22943.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread N B
Thanks Imran. It does help clarify. I believe I had it right all along then
but was confused by documentation talking about never changing the
broadcasted variables.

I've tried it on a local mode process till now and does seem to work as
intended. When (and if !) we start running on a real cluster, I hope this
holds up.

Thanks
NB


On Tue, May 19, 2015 at 6:25 AM, Imran Rashid iras...@cloudera.com wrote:

 hmm, I guess it depends on the way you look at it.  In a way, I'm saying
 that spark does *not* have any built in auto-re-broadcast if you try to
 mutate a broadcast variable.  Instead, you should create something new, and
 just broadcast it separately.  Then just have all the code you have
 operating on your RDDs look at the new broadcast variable.

 But I guess there is another way to look at it -- you are creating new
 broadcast variables each time, but they all point to the same underlying
 mutable data structure.  So in a way, you are rebroadcasting the same
 underlying data structure.

 Let me expand my example from earlier a little bit more:


 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
  ...
 }

 // this is a val, because the data structure itself is mutable
 val myMutableDataStructue = ...
 // this is a var, because you will create new broadcasts
 var myBroadcast = sc.broadcast(myMutableDataStructure)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   // update your mutable data structure in place
   myMutableDataStructure.update(...)
   // ... but that doesn't effect the broadcast variables living out on the
 cluster, so we need to
   // create a new one

   // this line is not required -- the broadcast var will automatically get
 unpersisted when a gc
   // cleans up the old broadcast on the driver, but I'm including this
 here for completeness,
   // in case you want to more proactively clean up old blocks if you are
 low on space
   myBroadcast.unpersist()

   // now we create a new broadcast which has the updated data in our
 mutable data structure
   myBroadcast = sc.broadcast(myMutableDataStructure)
 }


 hope this clarifies things!

 Imran

 On Tue, May 19, 2015 at 3:06 AM, N B nb.nos...@gmail.com wrote:

 Hi Imran,

 If I understood you correctly, you are suggesting to simply call
 broadcast again from the driver program. This is exactly what I am hoping
 will work as I have the Broadcast data wrapped up and I am indeed
 (re)broadcasting the wrapper over again when the underlying data changes.
 However, documentation seems to suggest that one cannot re-broadcast. Is my
 understanding accurate?

 Thanks
 NB


 On Mon, May 18, 2015 at 6:24 PM, Imran Rashid iras...@cloudera.com
 wrote:

 Rather than updating the broadcast variable, can't you simply create a
 new one?  When the old one can be gc'ed in your program, it will also get
 gc'ed from spark's cache (and all executors).

 I think this will make your code *slightly* more complicated, as you
 need to add in another layer of indirection for which broadcast variable to
 use, but not too bad.  Eg., from

 var myBroadcast = sc.broadcast( ...)
 (0 to 20).foreach{ iteration =
   //  ... some rdd operations that involve myBroadcast ...
   myBroadcast.update(...) // wrong! dont' update a broadcast variable
 }

 instead do something like:

 def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit
 = {
  ...
 }

 var myBroadcast = sc.broadcast(...)
 (0 to 20).foreach { iteration =
   oneIteration(myRDD, myBroadcast)
   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here,
 with whatever you need to update it
 }

 On Sat, May 16, 2015 at 2:01 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is
 broadcast in order to ensure that all nodes get the same value of the
 broadcast variable


 On Sat, May 16, 2015 at 10:39 AM, N B nb.nos...@gmail.com wrote:

 Thanks Ilya. Does one have to call broadcast again once the
 underlying data is updated in order to get the changes visible on all 
 nodes?

 Thanks
 NB


 On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin ilgan...@gmail.com
 wrote:

 The broadcast variable is like a pointer. If the underlying data
 changes then the changes will be visible throughout the cluster.
 On Fri, May 15, 2015 at 5:18 PM NB nb.nos...@gmail.com wrote:

 Hello,

 Once a broadcast variable is created using
 

Re: Reading Binary files in Spark program

2015-05-19 Thread Akhil Das
Try something like:

JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir,
  org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
IntWritable.class,
  Text.class, new Job().getConfiguration());

With the type of input format that you require.

Thanks
Best Regards

On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com
wrote:

 Hi Team,

 I am new to Spark and learning.
 I am trying to read image files into spark job. This is how I am doing:
 Step 1. Created sequence files with FileName as Key and Binary image as
 value. i.e.  Text and BytesWritable.
 I am able to read these sequence files into Map Reduce programs.

 Step 2.
 I understand that Text and BytesWritable are Non Serializable therefore, I
 read the sequence file in Spark as following:

 SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile);
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0],
 String.class, Byte.class) ;
 final ListTuple2lt;String, Byte tuple2s = seqFiles.collect();



 The moment I try to call collect() method to get the keys of sequence file,
 following exception has been thrown

 Can any one help me understanding why collect() method is failing? If I use
 toArray() on seqFiles object then also I am getting same call stack.

 Regards
 Tapan



 java.io.NotSerializableException: org.apache.hadoop.io.Text
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
 at
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
 at

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
 at

 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)
 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
 scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
 retrying
 2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
 scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
 0.0, whose tasks have all completed, from pool
 2015-05-19 15:15:03,739 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
 scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage
 0
 2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
 (Logging.scala:logInfo(59)) - Job 0 failed: collect at
 JavaSequenceFile.java:44, took 4.421397 s
 Exception in thread main org.apache.spark.SparkException: Job aborted due
 to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
 result: org.apache.hadoop.io.Text
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at

 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 

Re: group by and distinct performance issue

2015-05-19 Thread Todd Nist
You may want to look at this tooling for helping identify performance
issues and bottlenecks:

https://github.com/kayousterhout/trace-analysis

I believe this is slated to become part of the web ui in the 1.4 release,
in fact based on the status of the JIRA,
https://issues.apache.org/jira/browse/SPARK-6418, looks like it is complete.


On Tue, May 19, 2015 at 3:56 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Hi Peer,

 If you open the driver UI (running on port 4040) you can see the stages
 and the tasks happening inside it. Best way to identify the bottleneck for
 a stage is to see if there's any time spending on GC, and how many tasks
 are there per stage (it should be a number  total # cores to achieve max
 parallelism). Also you can see for each task how long does it take etc into
 consideration.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 12:58 PM, Peer, Oded oded.p...@rsa.com wrote:

  I am running Spark over Cassandra to process a single table.

 My task reads a single days’ worth of data from the table and performs 50
 group by and distinct operations, counting distinct userIds by different
 grouping keys.

 My code looks like this:



JavaRddRow rdd = sc.parallelize().mapPartitions().cache() // reads
 the data from the table

for each groupingKey {

   JavaPairRddGroupingKey, UserId groupByRdd = rdd.mapToPair();

   JavaPairRDDGroupingKey, Long countRdd =
 groupByRdd.distinct().mapToPair().reduceByKey() // counts distinct values
 per grouping key

}



 The distinct() stage takes about 2 minutes for every groupByValue, and my
 task takes well over an hour to complete.

 My cluster has 4 nodes and 30 GB of RAM per Spark process, the table size
 is 4 GB.



 How can I identify the bottleneck more accurately? Is it caused by
 shuffling data?

 How can I improve the performance?



 Thanks,

 Oded





Re: How to use spark to access HBase with Security enabled

2015-05-19 Thread Ted Yu
Please take a look at:
http://hbase.apache.org/book.html#_client_side_configuration_for_secure_operation

Cheers

On Tue, May 19, 2015 at 5:23 AM, donhoff_h 165612...@qq.com wrote:


 The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my
 spark programs. I am sure I have run the kinit command to make it take
 effect. And I also used the HBase Shell to verify that this user has the
 right to scan and put the tables in HBase.

 Now I still have no idea how to solve this problem. Can anybody help me to
 figure it out? Many Thanks!

 -- 原始邮件 --
 *发件人:* yuzhihong;yuzhih...@gmail.com;
 *发送时间:* 2015年5月19日(星期二) 晚上7:55
 *收件人:* donhoff_h165612...@qq.com;
 *抄送:* useruser@spark.apache.org;
 *主题:* Re: How to use spark to access HBase with Security enabled

 Which user did you run your program as ?

 Have you granted proper permission on hbase side ?

 You should also check master log to see if there was some clue.

 Cheers



 On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com wrote:

 Hi, experts.

 I ran the HBaseTest program which is an example from the Apache Spark
 source code to learn how to use spark to access HBase. But I met the
 following exception:
 Exception in thread main
 org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after
 attempts=36, exceptions:
 Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException:
 callTimeout=6, callDuration=68648: row 'spark_t01,,00' on
 table 'hbase:meta' at region=hbase:meta,,1.1588230740,
 hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0

 I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in
 the above exception. I found a few entries like the following one:
 2015-05-19 16:59:11,143 DEBUG
 [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer:
 RpcServer.listener,port=16020: Caught exception while
 reading:Authentication is required

 The above entry did not point to my program clearly. But the time is very
 near. Since my hbase version is HBase1.0.0 and I set security enabled, I
 doubt the exception was caused by the Kerberos authentication.  But I am
 not sure.

 Do anybody know if my guess is right? And if I am right, could anybody
 tell me how to set Kerberos Authentication in a spark program? I don't know
 how to do it. I already checked the API doc , but did not found any API
 useful. Many Thanks!

 By the way, my spark version is 1.3.0. I also paste the code of
 HBaseTest in the following:
 ***Source Code**
 object HBaseTest {
   def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName(HBaseTest)
 val sc = new SparkContext(sparkConf)
 val conf = HBaseConfiguration.create()
 conf.set(TableInputFormat.INPUT_TABLE, args(0))

 // Initialize hBase table if necessary
 val admin = new HBaseAdmin(conf)
 if (!admin.isTableAvailable(args(0))) {
   val tableDesc = new HTableDescriptor(args(0))
   admin.createTable(tableDesc)
 }

 val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])

 hBaseRDD.count()

 sc.stop()
   }
 }




Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Matei Zaharia
Yeah, this definitely seems useful there. There might also be some ways to cap 
the application in Mesos, but I'm not sure.

Matei

 On May 19, 2015, at 1:11 PM, Thomas Dudziak tom...@gmail.com wrote:
 
 I'm using fine-grained for a multi-tenant environment which is why I would 
 welcome the limit of tasks per job :)
 
 cheers,
 Tom
 
 On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com 
 mailto:matei.zaha...@gmail.com wrote:
 Hey Tom,
 
 Are you using the fine-grained or coarse-grained scheduler? For the 
 coarse-grained scheduler, there is a spark.cores.max config setting that will 
 limit the total # of cores it grabs. This was there in earlier versions too.
 
 Matei
 
  On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com 
  mailto:tom...@gmail.com wrote:
 
  I read the other day that there will be a fair number of improvements in 
  1.4 for Mesos. Could I ask for one more (if it isn't already in there): a 
  configurable limit for the number of tasks for jobs run on Mesos ? This 
  would be a very simple yet effective way to prevent a job dominating the 
  cluster.
 
  cheers,
  Tom
 
 
 



Re: spark streaming doubt

2015-05-19 Thread Shushant Arora
Thanks Akhil andDibyendu.

Does in high level receiver based streaming executors run on receivers
itself to have data localisation ? Or its always data is transferred to
executor nodes and executor nodes differ in each run of job but receiver
node remains same(same machines) throughout life of streaming application
unless node failure happens?



On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Just to add, there is a Receiver based Kafka consumer which uses Kafka Low
 Level Consumer API.

 http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


 Regards,
 Dibyendu

 On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:


 On Tue, May 19, 2015 at 8:10 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 So for Kafka+spark streaming, Receiver based streaming used highlevel
 api and non receiver based streaming used low level api.

 1.In high level receiver based streaming does it registers consumers at
 each job start(whenever a new job is launched by streaming application say
 at each second)?


 ​- Receiver based streaming will always have the receiver running
 parallel while your job is running, So by default for every 200ms
 (spark.streaming.blockInterval) the receiver will generate a block of data
 which is read from Kafka.
 ​


 2.No of executors in highlevel receiver based jobs will always equal to
 no of partitions in topic ?


 ​- Not sure from where did you came up with this. For the non stream
 based one, i think the number of partitions in spark will be equal to the
 number of kafka partitions for the given topic.
 ​


 3.Will data from a single topic be consumed by executors in parllel or
 only one receiver consumes in multiple threads and assign to executors in
 high level receiver based approach ?

 ​- They will consume the data parallel.​ For the receiver based
 approach, you can actually specify the number of receiver that you want to
 spawn for consuming the messages.




 On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 spark.streaming.concurrentJobs which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the 
 ingested
 data fast enough. With only 1 job running at a time, it is easy to see 
 that
 if batch processing time  batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming
 .

 Non-receiver based streaming for example you can say are the
 fileStream, directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the
 all pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which 
 is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which 
 will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold
 your data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 

Re: Getting the best parameter set back from CrossValidatorModel

2015-05-19 Thread Joseph Bradley
Hi Justin  Ram,

To clarify, PipelineModel.stages is not private[ml]; only the PipelineModel
constructor is private[ml].  So it's safe to use pipelineModel.stages as a
Spark user.

Ram's example looks good.  Btw, in Spark 1.4 (and the current master
build), we've made a number of improvements to Params and Pipelines, so
this should become easier to use!

Joseph

On Sun, May 17, 2015 at 10:17 PM, Justin Yip yipjus...@prediction.io
wrote:


 Thanks Ram.

 Your sample look is very helpful. (there is a minor bug that
 PipelineModel.stages is hidden under private[ml], just need a wrapper
 around it. :)

 Justin

 On Sat, May 16, 2015 at 10:44 AM, Ram Sriharsha sriharsha@gmail.com
 wrote:

 Hi Justin

 The CrossValidatorExample here
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
 is a good example of how to set up an ML Pipeline for extracting a model
 with the best parameter set.

 You set up the pipeline as in here:

 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala#L73

 This pipeline is treated as an estimator and wrapped into a Cross
 Validator to do grid search and return the model with the best parameters .
 Once you have trained the best model as in here

 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala#L93

 The result is a CrossValidatorModel which contains the best estimator
 (i.e. the best pipeline above) and you can extract the best pipeline and
 inquire its parameters as follows:

 // what are the best parameters?
 val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
 val stages = bestPipelineModel.stages

 val hashingStage = stages(1).asInstanceOf[HashingTF]
 println(hashingStage.getNumFeatures)
 val lrStage = stages(2).asInstanceOf[LogisticRegressionModel]
 println(lrStage.getRegParam)



 Ram

 On Sat, May 16, 2015 at 3:17 AM, Justin Yip yipjus...@prediction.io
 wrote:

 Hello,

 I am using MLPipeline. I would like to extract the best parameter found
 by CrossValidator. But I cannot find much document about how to do it. Can
 anyone give me some pointers?

 Thanks.

 Justin

 --
 View this message in context: Getting the best parameter set back from
 CrossValidatorModel
 http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-best-parameter-set-back-from-CrossValidatorModel-tp22915.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.






Re: How to implement an Evaluator for a ML pipeline?

2015-05-19 Thread Xiangrui Meng
The documentation needs to be updated to state that higher metric
values are better (https://issues.apache.org/jira/browse/SPARK-7740).
I don't know why if you negate the return value of the Evaluator you
still get the highest regularization parameter candidate. Maybe you
should check the log messages from CrossValidator and see the average
metric values during cross validation. -Xiangrui

On Sat, May 9, 2015 at 12:15 PM, Stefan H. twel...@gmx.de wrote:
 Hello everyone,

 I am stuck with the (experimental, I think) API for machine learning
 pipelines. I have a pipeline with just one estimator (ALS) and I want it to
 try different values for the regularization parameter. Therefore I need to
 supply an Evaluator that returns a value of type Double. I guess this could
 be something like accuracy or mean squared error? The only implementation I
 found is BinaryClassificationEvaluator, and I did not understand the
 computation there.

 I could not find detailed documentation so I implemented a dummy Evaluator
 that just returns the regularization parameter:

   new Evaluator {
 def evaluate(dataset: DataFrame, paramMap: ParamMap): Double =
   paramMap.get(als.regParam).getOrElse(throw new Exception)
   }

 I just wanted to see whether the lower or higher value wins. On the
 resulting model I inspected the chosen regularization parameter this way:

   cvModel.bestModel.fittingParamMap.get(als.regParam)

 And it was the highest of my three regularization parameter candidates.
 Strange thing is, if I negate the return value of the Evaluator, that line
 still returns the highest regularization parameter candidate.

 So I am probably working with false assumptions. I'd be grateful if someone
 could point me to some documentation or examples, or has a few hints to
 share.

 Cheers,
 Stefan



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-an-Evaluator-for-a-ML-pipeline-tp22830.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Add to Powered by Spark page

2015-05-19 Thread Michal Klos
Hi,

We would like to be added to the Powered by Spark list:

organization name: Localytics
URL: http://eng.localytics.com/
a list of which Spark components you are using: Spark, Spark Streaming,
MLLib
a short description of your use case: Batch, real-time, and predictive
analytics driving our mobile app analytics and marketing automation product.

thanks,

M


Re: rdd.sample() methods very slow

2015-05-19 Thread Sean Owen
The way these files are accessed is inherently sequential-access. There
isn't a way to in general know where record N is in a file like this and
jump to it. So they must be read to be sampled.


On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  Hi



 I have an RDD[Document] that contains 7 million objects and it is saved in
 file system as object file. I want to get a random sample of about 70
 objects from it using rdd.sample() method. It is ver slow





 val rdd : RDD[Document] =
 sc.objectFile[Document](C:/temp/docs.obj).sample(false, 0.1D,
 0L).cache()

 val count = rdd.count()



 From Spark UI, I see spark is try to read the entire object files at the
 folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very
 slow. Why does Spark try to read entire 7 million objects while I only need
 to return a random sample of 70 objects?



 Is there any efficient way to get a random sample of 70 objects without
 reading through the entire object files?



 Ningjun





Exception when using CLUSTER BY or ORDER BY

2015-05-19 Thread Thomas Dudziak
Under certain circumstances that I haven't yet been able to isolate, I get
the following error when doing a HQL query using HiveContext (Spark 1.3.1
on Mesos, fine-grained mode). Is this a known problem or should I file a
JIRA for it ?


org.apache.spark.SparkException: Can only zip RDDs with same number of
elements in each partition
at 
org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56)
at 
org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259)
at 
org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: Naming an DF aggregated column

2015-05-19 Thread Michael Armbrust
customerDF.groupBy(state).agg(max($discount).alias(newName))

(or .as(...), both functions can take a String or a Symbol)

On Tue, May 19, 2015 at 2:11 PM, Cesar Flores ces...@gmail.com wrote:


 I would like to ask if there is a way of specifying the column name of a
 data frame aggregation. For example If I do:

 customerDF.groupBy(state).agg(max($discount))

 the name of my aggregated column will be: MAX('discount)

 Is there a way of changing the name of that column to something else on
 the fly, and not after performing the aggregation?


 thanks
 --
 Cesar Flores



Re: Word2Vec with billion-word corpora

2015-05-19 Thread Xiangrui Meng
With vocabulary size 4M and 400 vector size, you need 400 * 4M = 16B
floats to store the model. That is 64GB. We store the model on the
driver node in the current implementation. So I don't think it would
work. You might try increasing the minCount to decrease the vocabulary
size and reduce the vector size. I'm interested in learning the
trade-off between the model size and the model quality. If you have
done some experiments, please let me know. Thanks! -Xiangrui

On Wed, May 13, 2015 at 11:17 AM, Shilad Sen s...@macalester.edu wrote:
 Hi all,

 I'm experimenting with Spark's Word2Vec implementation for a relatively
 large (5B word, vocabulary size 4M, 400-dimensional vectors) corpora. Has
 anybody had success running it at this scale?

 Thanks in advance for your guidance!

 -Shilad

 --
 Shilad W. Sen
 Associate Professor
 Mathematics, Statistics, and Computer Science Dept.
 Macalester College
 s...@macalester.edu
 http://www.shilad.com
 https://www.linkedin.com/in/shilad
 651-696-6273

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Hive 1.0 support in Spark

2015-05-19 Thread Kannan Rajah
Does Spark 1.3.1 support Hive 1.0? If not, which version of Spark will
start supporting Hive 1.0?

--
Kannan


Re: Discretization

2015-05-19 Thread Xiangrui Meng
Thanks for asking! We should improve the documentation. The sample
dataset is actually mimicking the MNIST digits dataset, where the
values are gray levels (0-255). So by dividing by 16, we want to map
it to 16 coarse bins for the gray levels. Actually, there is a bug in
the doc, we should convert the values to integer first before dividing
by 16. I created https://issues.apache.org/jira/browse/SPARK-7739 for
this issue. Welcome to submit a patch:) Thanks!

Best,
Xiangrui

On Thu, May 7, 2015 at 9:20 PM, spark_user_2015 li...@adobe.com wrote:
 The Spark documentation shows the following example code:

 // Discretize data in 16 equal bins since ChiSqSelector requires categorical
 features
 val discretizedData = data.map { lp =
   LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x = x / 16
 } ) )
 }

 I'm sort of missing why x / 16 is considered a discretization approach
 here.

 [https://spark.apache.org/docs/latest/mllib-feature-extraction.html#feature-selection]



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Discretization-tp22811.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Naming an DF aggregated column

2015-05-19 Thread Cesar Flores
I would like to ask if there is a way of specifying the column name of a
data frame aggregation. For example If I do:

customerDF.groupBy(state).agg(max($discount))

the name of my aggregated column will be: MAX('discount)

Is there a way of changing the name of that column to something else on the
fly, and not after performing the aggregation?


thanks
-- 
Cesar Flores


Re: Does Python 2.7 have to be installed on every cluster node?

2015-05-19 Thread Davies Liu
PySpark work with CPython by default, and you can specify which
version of Python to use by:

PYSPARK_PYTHON=path/to/path bin/spark-submit  xxx.py

When you do the upgrade, you could install python 2.7 on every machine
in the cluster, test it with

PYSPARK_PYTHON=python2.7 bin/spark-submit  xxx.py

For YARN, you also need to install python2.7 in every node in the cluster.

On Tue, May 19, 2015 at 7:44 AM, YaoPau jonrgr...@gmail.com wrote:
 We're running Python 2.6.6 here but we're looking to upgrade to 2.7.x  in a
 month.

 Does pyspark work by converting Python into Java Bytecode, or does it run
 Python natively?

 And along those lines, if we're running in yarn-client mode, would we have
 to upgrade just the edge node version of Python, or every node in the
 cluster?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Does-Python-2-7-have-to-be-installed-on-every-cluster-node-tp22945.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stratified sampling with DataFrames

2015-05-19 Thread Xiangrui Meng
You need to convert DataFrame to RDD, call sampleByKey, and then apply
the schema back to create DataFrame.

val df: DataFrame = ...
val schema = df.schema
val sampledRDD = df.rdd.keyBy(r = r.getAs[Int](0)).sampleByKey(...).values
val sampled = sqlContext.createDataFrame(sampledRDD, schema)

Hopefully this would be much easier in 1.5.

Best,
Xiangrui

On Mon, May 11, 2015 at 12:32 PM, Karthikeyan Muthukumar
mkarthiksw...@gmail.com wrote:
 Hi,
 I'm in Spark 1.3.0 and my data is in DataFrames.
 I need operations like sampleByKey(), sampleByKeyExact().
 I saw the JIRA Add approximate stratified sampling to DataFrame
 (https://issues.apache.org/jira/browse/SPARK-7157).
 That's targeted for Spark 1.5, till that comes through, whats the easiest
 way to accomplish the equivalent of sampleByKey() and sampleByKeyExact() on
 DataFrames.
 Thanks  Regards
 MK


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Cody Koeninger
If you checkpoint, the job will start from the successfully consumed
offsets.  If you don't checkpoint, by default it will start from the
highest available offset, and you will potentially lose data.

Is the link I posted, or for that matter the scaladoc, really not clear on
that point?

The scaladoc says:

 To recover from driver failures, you have to enable checkpointing in the
StreamingContext
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/streaming/StreamingContext.html.
The information on consumed offset can be recovered from the checkpoint.

On Tue, May 19, 2015 at 2:38 PM, Bill Jay bill.jaypeter...@gmail.com
wrote:

 If a Spark streaming job stops at 12:01 and I resume the job at 12:02.
 Will it still start to consume the data that were produced to Kafka at
 12:01? Or it will just start consuming from the current time?


 On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Have you read
 https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ?

 1.  There's nothing preventing that.

 2. Checkpointing will give you at-least-once semantics, provided you have
 sufficient kafka retention.  Be aware that checkpoints aren't recoverable
 if you upgrade code.

 On Tue, May 19, 2015 at 12:42 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I am currently using Spark streaming to consume and save logs every hour
 in our production pipeline. The current setting is to run a crontab job to
 check every minute whether the job is still there and if not resubmit a
 Spark streaming job. I am currently using the direct approach for Kafka
 consumer. I have two questions:

 1. In the direct approach, no offset is stored in zookeeper and no group
 id is specified. Can two consumers (one is Spark streaming and the other is
 a Kafak console consumer in Kafka package) read from the same topic from
 the brokers together (I would like both of them to get all messages, i.e.
 publish-subscribe mode)? What about two Spark streaming jobs read from the
 same topic?

 2. How to avoid data loss if a Spark job is killed? Does checkpointing
 serve this purpose? The default behavior of Spark streaming is to read the
 latest logs. However, if a job is killed, can the new job resume from what
 was left to avoid loosing logs?

 Thanks!

 Bill






How to set the file size for parquet Part

2015-05-19 Thread Richard Grossman
Hi

I'm using spark 1.3.1 and now I can't set the size of the part generated
file for parquet.
The size is only 512Kb it's really to small I must made them bigger.
How can set this ?
Thanks


Re: RandomSplit with Spark-ML and Dataframe

2015-05-19 Thread Olivier Girardot
Thank you !

Le mar. 19 mai 2015 à 21:08, Xiangrui Meng men...@gmail.com a écrit :

 In 1.4, we added RAND as a DataFrame expression, which can be used for
 random split. Please check the example here:

 https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.
 https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.-Xiangrui
 -Xiangrui
 https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.-Xiangrui

 On Thu, May 7, 2015 at 8:39 AM, Olivier Girardot
 o.girar...@lateral-thoughts.com wrote:
  Hi,
  is there any best practice to do like in MLLib a randomSplit of
  training/cross-validation set with dataframes and the pipeline API ?
 
  Regards
 
  Olivier.



Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar

2015-05-19 Thread Xiangrui Meng
Hey Jaonary,

I saw this line in the error message:

org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)

CaseClassStringParser is only used in older versions of Spark to parse
schema from JSON. So I suspect that the cluster was running on a old
version of Spark when you use spark-submit to run your assembly jar.

Best,
Xiangrui

On Mon, May 11, 2015 at 7:40 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 In this example, every thing work expect save to parquet file.

 On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 MyDenseVectorUDT do exist in the assembly jar and in this example all the
 code is in a single file to make sure every thing is included.

 On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng men...@gmail.com wrote:

 You should check where MyDenseVectorUDT is defined and whether it was
 on the classpath (or in the assembly jar) at runtime. Make sure the
 full class name (with package name) is used. Btw, UDTs are not public
 yet, so please use it with caution. -Xiangrui

 On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:
  Dear all,
 
  Here is an example of code to reproduce the issue I mentioned in a
  previous
  mail about saving an UserDefinedType into a parquet file. The problem
  here
  is that the code works when I run it inside intellij idea but fails
  when I
  create the assembly jar and run it with spark-submit. I use the master
  version of  Spark.
 
  @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
  class MyDenseVector(val data: Array[Double]) extends Serializable {
override def equals(other: Any): Boolean = other match {
  case v: MyDenseVector =
java.util.Arrays.equals(this.data, v.data)
  case _ = false
}
  }
 
  class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
override def sqlType: DataType = ArrayType(DoubleType, containsNull =
  false)
override def serialize(obj: Any): Seq[Double] = {
  obj match {
case features: MyDenseVector =
  features.data.toSeq
  }
}
 
override def deserialize(datum: Any): MyDenseVector = {
  datum match {
case data: Seq[_] =
  new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
  }
}
 
override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]
 
  }
 
  case class Toto(imageAnnotation: MyDenseVector)
 
  object TestUserDefinedType {
 
case class Params(input: String = null,
 partitions: Int = 12,
  outputDir: String = images.parquet)
 
def main(args: Array[String]): Unit = {
 
  val conf = new
  SparkConf().setAppName(ImportImageFolder).setMaster(local[4])
 
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)
 
  import sqlContext.implicits._
 
  val rawImages = sc.parallelize((1 to 5).map(x = Toto(new
  MyDenseVector(Array[Double](x.toDouble).toDF
 
  rawImages.printSchema()
 
  rawImages.show()
 
  rawImages.save(toto.parquet) // This fails with assembly jar
  sc.stop()
 
}
  }
 
 
  My build.sbt is as follow :
 
  libraryDependencies ++= Seq(
org.apache.spark %% spark-core % sparkVersion % provided,
org.apache.spark %% spark-sql % sparkVersion,
org.apache.spark %% spark-mllib % sparkVersion
  )
 
  assemblyMergeStrategy in assembly := {
case PathList(javax, servlet, xs @ _*) = MergeStrategy.first
case PathList(org, apache, xs @ _*) = MergeStrategy.first
case PathList(org, jboss, xs @ _*) = MergeStrategy.first
  //  case PathList(ps @ _*) if ps.last endsWith .html =
  MergeStrategy.first
  //  case application.conf=
  MergeStrategy.concat
case m if m.startsWith(META-INF) = MergeStrategy.discard
//case x =
//  val oldStrategy = (assemblyMergeStrategy in assembly).value
//  oldStrategy(x)
case _ = MergeStrategy.first
  }
 
 
  As I said, this code works without problem when I execute it inside
  intellij
  idea. But when generate the assembly jar with sbt-assembly and
 
  use spark-submit I got the following error :
 
  15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is:
  PARQUET_1_0
  15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0
  (TID 7)
  java.lang.IllegalArgumentException: Unsupported dataType:
 
  {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]},
  [1.1] failure: `TimestampType' expected but `{' found
 
 
  {type:struct,fields:[{name:imageAnnotation,type:{type:udt,class:MyDenseVectorUDT,pyClass:null,sqlType:{type:array,elementType:double,containsNull:false}},nullable:true,metadata:{}}]}
  ^
at
 
  org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)
at
 
  org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)
  

Re: Increase maximum amount of columns for covariance matrix for principal components

2015-05-19 Thread Xiangrui Meng
We use a dense array to store the covariance matrix on the driver
node. So its length is limited by the integer range, which is 65536 *
65536 (actually half). -Xiangrui

On Wed, May 13, 2015 at 1:57 AM, Sebastian Alfers
sebastian.alf...@googlemail.com wrote:
 Hello,


 in order to compute a huge dataset, the amount of columns to calculate the
 covariance matrix is limited:

 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L129

 What is the reason behind this limitation and can it be extended?

 Greetings

 Sebastian

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: k-means core function for temporal geo data

2015-05-19 Thread Xiangrui Meng
I'm not sure whether k-means would converge with this customized
distance measure. You can list (weighted) time as a feature along with
coordinates, and then use Euclidean distance. For other supported
distance measures, you can check Derrick's package:
http://spark-packages.org/package/derrickburns/generalized-kmeans-clustering.
-Xiangrui

On Mon, May 18, 2015 at 2:30 AM, Pa Rö paul.roewer1...@googlemail.com wrote:
 hallo,

 i want cluster geo data (lat,long,timestamp) with k-means. now i search for
 a good core function, i can not find good paper or other sources for that.
 to time i multiplicate the time and the space distance:

 public static double dis(GeoData input1, GeoData input2)
 {
double timeDis = Math.abs( input1.getTime() - input2.getTime() );
double geoDis = geoDis(input1, input2); //extra function
return timeDis*geoDis;
 }

 maybe someone know a good core function for clustering temporal geo data?
 (need citation)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark mllib kmeans

2015-05-19 Thread Xiangrui Meng
Just curious, what distance measure do you need? -Xiangrui

On Mon, May 11, 2015 at 8:28 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 take a look at this
 https://github.com/derrickburns/generalized-kmeans-clustering

 Best,

 Jao

 On Mon, May 11, 2015 at 3:55 PM, Driesprong, Fokko fo...@driesprong.frl
 wrote:

 Hi Paul,

 I would say that it should be possible, but you'll need a different
 distance measure which conforms to your coordinate system.

 2015-05-11 14:59 GMT+02:00 Pa Rö paul.roewer1...@googlemail.com:

 hi,

 it is possible to use a custom distance measure and a other data typ as
 vector?
 i want cluster temporal geo datas.

 best regards
 paul




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   >