StringIndexer + VectorAssembler equivalent to HashingTF?

2015-08-06 Thread praveen S
Is StringIndexer + VectorAssembler equivalent to HashingTF while converting
the document for analysis?


Re: All masters are unresponsive! Giving up.

2015-08-06 Thread Sonal Goyal
There seems  to be a version mismatch somewhere. You can try and find out
the cause with debug serialization information. I think the jvm flag
-Dsun.io.serialization.extendedDebugInfo=true should help.

Best Regards,
Sonal
Founder, Nube Technologies 
Check out Reifier at Spark Summit 2015






On Fri, Aug 7, 2015 at 4:42 AM, Jeff Jones 
wrote:

> I wrote a very simple Spark 1.4.1 app that I can run through a local
> driver program just fine using setMaster(“local[*]”).  The app is as
> follows:
>
>
>
> import org.apache.spark.SparkContext
>
> import org.apache.spark.SparkContext._
>
> import org.apache.spark.SparkConf
>
> import org.apache.spark.rdd.RDD
>
>
>
> object Hello {
>
>   def main(args: Array[String]): Unit = {
>
> val conf = new SparkConf().setAppName("Simple
> Application").setMaster("local[*]")
>
> val sc = new SparkContext(conf)
>
> val data:RDD[Int] = sc.parallelize(Seq(1,2,12,34,2354,123,100), 2)
>
> println("Max: " + data.max)
>
> println("Min: " + data.min)
>
>   }
>
> }
>
>
>
> I compile this using the following build.sbt which will pull the needed
> Spark libraries for me.
>
>
>
> name := """SparkyJeff"""
>
>
>
> version := "1.0"
>
>
>
> scalaVersion := "2.11.6"
>
>
>
> // Change this to another test framework if you prefer
>
> libraryDependencies ++= Seq(
>
> "org.apache.spark" %% "spark-core" % "1.4.1",
>
> "org.apache.spark" %% "spark-sql"  % "1.4.1")
>
>
>
> // Uncomment to use Akka
>
> //libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.11"
>
>
>
> fork := true
>
>
>
> Now I’m trying to run this against a standalone cluster by changing the
> setMaster(“local[*]”) to setMaster(“spark://p3.ourdomain.com:7077”). I
> downloaded Spark 1.4.1 for Hadoop 2.6 or greater.  Set the SPARK_MASTER_IP=”
> p3.ourdomain.com”, SPARK_WORKER_CORES=”1000”,SPARK_WORKER_MEMORY=”500g”
> and then started the cluster using run-all.sh. The cluster appears to start
> fine. I can hit cluster UI at p3.ourdomain.com:8080 and see the same
> master URL as mentioned above.
>
>
>
> Now when I run my little app I get the following client error:
>
>
>
> …
>
> [error] 15/08/05 16:03:40 INFO AppClient$ClientActor: Connecting to master
> akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...
>
> [error] 15/08/05 16:03:40 WARN ReliableDeliverySupervisor: Association
> with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> [error] 15/08/05 16:04:00 INFO AppClient$ClientActor: Connecting to master
> akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...
>
> [error] 15/08/05 16:04:00 WARN ReliableDeliverySupervisor: Association
> with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> [error] 15/08/05 16:04:20 INFO AppClient$ClientActor: Connecting to master
> akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...
>
> [error] 15/08/05 16:04:20 WARN ReliableDeliverySupervisor: Association
> with remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has
> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> [error] 15/08/05 16:04:40 ERROR SparkDeploySchedulerBackend: Application
> has been killed. Reason: All masters are unresponsive! Giving up.
>
> …
>
>
>
> Looking into the master logs I find:
>
>
>
> 15/08/06 22:52:28 INFO Master: akka.tcp://sparkDriver@192.168.137.41:48877
> got disassociated, removing it.
>
> 15/08/06 22:52:46 ERROR Remoting: org.apache.spark.deploy.Command; local
> class incompatible: stream classdesc serialVersionUID =
> -7098307370860582211, local class serialVersionUID = -3335312719467547622
>
> java.io.InvalidClassException: org.apache.spark.deploy.Command; local
> class incompatible: stream classdesc serialVersionUID =
> -7098307370860582211, local class serialVersionUID = -3335312719467547622
>
> at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
>
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>
> at
>

Spark-submit fails when jar is in HDFS

2015-08-06 Thread abraithwaite
Hi All,

We're trying to run spark with mesos and docker in client mode (since mesos
doesn't support cluster mode) and load the application Jar from HDFS.  The
following is the command we're running:

We're getting the following warning before an exception from that command:


Before I debug further, is this even supported?  I started reading the code
and it wasn't clear that it's possible to load a remote jar in client mode
at all.  I did see a related issue in [2] but it didn't quite clarify
everything I was looking for.

Thanks,
- Alan

[1] https://spark.apache.org/docs/latest/submitting-applications.html

[2]
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-fails-when-jar-is-in-HDFS-tp24163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Out of memory with twitter spark streaming

2015-08-06 Thread Pankaj Narang
Hi 

I am running one application using activator where I am retrieving tweets
and storing them to mysql database using below code. 

I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
OOM get delayed only.

Can anybody give me clue. Here is the code

 var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
var tweets = tweetStream.map(tweet => { 
  var user = tweet.getUser
  var replyStatusId = tweet.getInReplyToStatusId
  var reTweetStatus = tweet.getRetweetedStatus
  var pTweetId = -1L
  var pcreatedAt = 0L
  if(reTweetStatus != null){
pTweetId = reTweetStatus.getId
pcreatedAt = reTweetStatus.getCreatedAt.getTime
  }  
  tweet.getCreatedAt.getTime + "|$" + tweet.getId +
"|$"+user.getId + "|$" + user.getName+ "|$" + user.getScreenName + "|$" +
user.getDescription +
  "|$" + tweet.getText.trim + "|$" + user.getFollowersCount +
"|$" + user.getFriendsCount + "|$" + tweet.getGeoLocation + "|$" +
  user.getLocation + "|$" + user.getBiggerProfileImageURL + "|$"
+ replyStatusId + "|$" + pTweetId + "|$" + pcreatedAt
} )
  tweets.foreachRDD(tweetsRDD => {tweetsRDD.distinct()
 val count = tweetsRDD.count
 println("*" +"%s tweets found on
this RDD".format(count))
 if (count >  0){
var timeMs = System.currentTimeMillis
var counter =
DBQuery.getProcessedCount()
   var location="tweets/"+ counter +"/" 
tweetsRDD.collect().map(tweet=> 
DBQuery.saveTweets(tweet)) 
//tweetsRDD.saveAsTextFile(location+
timeMs)+ ".txt"
DBQuery.addTweetRDD(counter) 
}
})
  
   // Checkpoint directory to recover from failures
   println("tweets for the last stream are saved which can be processed
later")
   val= "f:/svn1/checkpoint/"
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()


regards
Pankaj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Pedro Rodriguez
Worth noting that Spark 1.5 is extending that list of Spark SQL functions
quite a bit. Not sure where in the docs they would be yet, but the JIRA is
here: https://issues.apache.org/jira/browse/SPARK-8159

On Thu, Aug 6, 2015 at 7:27 PM, Netwaver  wrote:

> Thanks for your kindly help
>
>
>
>
>
>
> At 2015-08-06 19:28:10, "Todd Nist"  wrote:
>
> They are covered here in the docs:
>
>
> http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$
>
>
> On Thu, Aug 6, 2015 at 5:52 AM, Netwaver  wrote:
>
>> Hi All,
>>  I am using Spark 1.4.1, and I want to know how can I find the
>> complete function list supported in Spark SQL, currently I only know
>> 'sum','count','min','max'. Thanks a lot.
>>
>>
>>
>
>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: How to binarize data in spark

2015-08-06 Thread Yanbo Liang
I think you want to flatten the 1M products to a vector of 1M elements, of
course mostly are zero.
It looks like HashingTF

can help you.

2015-08-07 11:02 GMT+08:00 praveen S :

> Use StringIndexer in MLib1.4 :
>
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/ml/feature/StringIndexer.html
>
> On Thu, Aug 6, 2015 at 8:49 PM, Adamantios Corais <
> adamantios.cor...@gmail.com> wrote:
>
>> I have a set of data based on which I want to create a classification
>> model. Each row has the following form:
>>
>> user1,class1,product1
>>> user1,class1,product2
>>> user1,class1,product5
>>> user2,class1,product2
>>> user2,class1,product5
>>> user3,class2,product1
>>> etc
>>
>>
>> There are about 1M users, 2 classes, and 1M products. What I would like
>> to do next is create the sparse vectors (something already supported by
>> MLlib) BUT in order to apply that function I have to create the dense vectors
>> (with the 0s), first. In other words, I have to binarize my data. What's
>> the easiest (or most elegant) way of doing that?
>>
>>
>> *// Adamantios*
>>
>>
>>
>


stopping spark stream app

2015-08-06 Thread Shushant Arora
Hi

I am using spark stream 1.3 and using custom checkpoint to save kafka
offsets.

1.Is doing
Runtime.getRuntime().addShutdownHook(new Thread() {
  @Override
  public void run() {
  jssc.stop(true, true);
   System.out.println("Inside Add Shutdown Hook");
  }
 });

to handle stop is safe ?

2.And I need to handle saving checkoinnt in shutdown hook also or driver
will handle it automatically since it grcaefully stops stream and handle
completion of foreachRDD function on stream ?
directKafkaStream.foreachRDD(new Function, Void>() {
}

Thanks


Re: Spark MLib v/s SparkR

2015-08-06 Thread praveen S
I am starting off with classification models, Logistic,RandomForest.
Basically wanted to learn Machine learning.
Since I have a java background I started off with MLib, but later heard R
works as well ( with scaling issues - only).

So, with SparkR was wondering the scaling issue would be resolved - hence
my question why not go with R and Spark R alone.( keeping aside my
inclination towards java)

On Thu, Aug 6, 2015 at 12:28 AM, Charles Earl 
wrote:

> What machine learning algorithms are you interested in exploring or using?
> Start from there or better yet the problem you are trying to solve, and
> then the selection may be evident.
>
>
> On Wednesday, August 5, 2015, praveen S  wrote:
>
>> I was wondering when one should go for MLib or SparkR. What is the
>> criteria or what should be considered before choosing either of the
>> solutions for data analysis?
>> or What is the advantages of Spark MLib over Spark R or advantages of
>> SparkR over MLib?
>>
>
>
> --
> - Charles
>


Re: How to binarize data in spark

2015-08-06 Thread praveen S
Use StringIndexer in MLib1.4 :
https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/ml/feature/StringIndexer.html

On Thu, Aug 6, 2015 at 8:49 PM, Adamantios Corais <
adamantios.cor...@gmail.com> wrote:

> I have a set of data based on which I want to create a classification
> model. Each row has the following form:
>
> user1,class1,product1
>> user1,class1,product2
>> user1,class1,product5
>> user2,class1,product2
>> user2,class1,product5
>> user3,class2,product1
>> etc
>
>
> There are about 1M users, 2 classes, and 1M products. What I would like to
> do next is create the sparse vectors (something already supported by MLlib)
> BUT in order to apply that function I have to create the dense vectors
> (with the 0s), first. In other words, I have to binarize my data. What's
> the easiest (or most elegant) way of doing that?
>
>
> *// Adamantios*
>
>
>


Re:Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Netwaver
Thanks for your kindly help






At 2015-08-06 19:28:10, "Todd Nist"  wrote:

They are covered here in the docs:

http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$





On Thu, Aug 6, 2015 at 5:52 AM, Netwaver  wrote:

Hi All,
 I am using Spark 1.4.1, and I want to know how can I find the complete 
function list supported in Spark SQL, currently I only know 
'sum','count','min','max'. Thanks a lot.







Spark-submit fails when jar is in HDFS

2015-08-06 Thread Alan Braithwaite
Hi All,

We're trying to run spark with mesos and docker in client mode (since mesos
doesn't support cluster mode) and load the application Jar from HDFS.  The
following is the command we're running:

/usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf
spark.mesos.executor.docker.image=docker.repo/spark:latest --class
org.apache.spark.examples.SparkPi
hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100

We're getting the following warning before an exception from that command:

Warning: Skip remote jar
hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar.
java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi

Before I debug further, is this even supported?  I started reading the code
and it wasn't clear that it's possible to load a remote jar in client mode
at all.  I did see a related issue in [2] but it didn't quite clarify
everything I was looking for.

Thanks,
- Alan

[1] https://spark.apache.org/docs/latest/submitting-applications.html

[2]
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html


All masters are unresponsive! Giving up.

2015-08-06 Thread Jeff Jones
I wrote a very simple Spark 1.4.1 app that I can run through a local driver 
program just fine using setMaster("local[*]").  The app is as follows:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

object Hello {
  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Simple 
Application").setMaster("local[*]")
val sc = new SparkContext(conf)
val data:RDD[Int] = sc.parallelize(Seq(1,2,12,34,2354,123,100), 2)
println("Max: " + data.max)
println("Min: " + data.min)
  }
}

I compile this using the following build.sbt which will pull the needed Spark 
libraries for me.

name := """SparkyJeff"""

version := "1.0"

scalaVersion := "2.11.6"

// Change this to another test framework if you prefer
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.4.1",
"org.apache.spark" %% "spark-sql"  % "1.4.1")

// Uncomment to use Akka
//libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.11"

fork := true

Now I'm trying to run this against a standalone cluster by changing the 
setMaster("local[*]") to setMaster("spark://p3.ourdomain.com:7077"). I 
downloaded Spark 1.4.1 for Hadoop 2.6 or greater.  Set the 
SPARK_MASTER_IP="p3.ourdomain.com", 
SPARK_WORKER_CORES="1000",SPARK_WORKER_MEMORY="500g" and then started the 
cluster using run-all.sh. The cluster appears to start fine. I can hit cluster 
UI at p3.ourdomain.com:8080 and see the same master URL as mentioned above.

Now when I run my little app I get the following client error:

...
[error] 15/08/05 16:03:40 INFO AppClient$ClientActor: Connecting to master 
akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...
[error] 15/08/05 16:03:40 WARN ReliableDeliverySupervisor: Association with 
remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has failed, 
address is now gated for [5000] ms. Reason is: [Disassociated].
[error] 15/08/05 16:04:00 INFO AppClient$ClientActor: Connecting to master 
akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...
[error] 15/08/05 16:04:00 WARN ReliableDeliverySupervisor: Association with 
remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has failed, 
address is now gated for [5000] ms. Reason is: [Disassociated].
[error] 15/08/05 16:04:20 INFO AppClient$ClientActor: Connecting to master 
akka.tcp://sparkmas...@p3.ourdomain.com:7077/user/Master...
[error] 15/08/05 16:04:20 WARN ReliableDeliverySupervisor: Association with 
remote system [akka.tcp://sparkmas...@p3.ourdomain.com:7077] has failed, 
address is now gated for [5000] ms. Reason is: [Disassociated].
[error] 15/08/05 16:04:40 ERROR SparkDeploySchedulerBackend: Application has 
been killed. Reason: All masters are unresponsive! Giving up.
...

Looking into the master logs I find:

15/08/06 22:52:28 INFO Master: akka.tcp://sparkDriver@192.168.137.41:48877 got 
disassociated, removing it.
15/08/06 22:52:46 ERROR Remoting: org.apache.spark.deploy.Command; local class 
incompatible: stream classdesc serialVersionUID = -7098307370860582211, local 
class serialVersionUID = -3335312719467547622
java.io.InvalidClassException: org.apache.spark.deploy.Command; local class 
incompatible: stream classdesc serialVersionUID = -7098307370860582211, local 
class serialVersionUID = -3335312719467547622
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at 
a

Re: shutdown local hivecontext?

2015-08-06 Thread Cesar Flores
Well. I managed to solve that issue after running my tests on a linux
system instead of windows (which I was originally using). However, now I
have an error when I try to reset the hive context using hc.reset(). It
tries to create a file inside directory /user/my_user_name instead of the
usual linux path /home/my_user_name, which fails.



On Thu, Aug 6, 2015 at 3:12 PM, Cesar Flores  wrote:

> Well, I try this approach, and still have issues. Apparently TestHive can
> not delete the hive metastore directory. The complete error that I have is:
>
> 15/08/06 15:01:29 ERROR Driver: FAILED: Execution Error, return code 1
> from org.apache.hadoop.hive.ql.exec.DDLTask.
> org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.NullPointerException
> 15/08/06 15:01:29 ERROR TestHive:
> ==
> HIVE FAILURE OUTPUT
> ==
> SET spark.sql.test=
> SET
> javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945;create=true
> SET
> hive.metastore.warehouse.dir=C:\cygwin64\tmp\sparkHiveWarehouse5264564710014125096
> FAILED: Execution Error, return code 1 from
> org.apache.hadoop.hive.ql.exec.DDLTask.
> org.apache.hadoop.hive.ql.metadata.HiveException:
> java.lang.NullPointerException
>
> ==
> END HIVE FAILURE OUTPUT
> ==
>
> [error] Uncaught exception when running
> com.dotomi.pipeline.utilitytransformers.SorterTransformerSuite:
> java.lang.ExceptionInInitializerError
> [trace] Stack trace suppressed: run last pipeline/test:testOnly for the
> full output.
> 15/08/06 15:01:29 ERROR Utils: Exception while deleting Spark temp dir:
> C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945
> java.io.IOException: Failed to delete:
> C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945
> at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:932)
> at
> org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:181)
> at
> org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:179)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
> at
> org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:179)
> at
> org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177)
> at
> org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
> at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177)
>
> Any new idea about how to avoid this error? I think the problem may be
> running the tests on sbt, as the created directories are locked until I
> exit the sbt command shell from where I run the tests. Please let me know
> if you have any other suggestion.
>
>
> Thanks
>
> On Mon, Aug 3, 2015 at 5:56 PM, Michael Armbrust 
> wrote:
>
>> TestHive takes care of creating a temporary directory for each invocation
>> so that multiple test runs won't conflict.
>>
>> On Mon, Aug 3, 2015 at 3:09 PM, Cesar Flores  wrote:
>>
>>>
>>> We are using a local hive context in order to run unit tests. Our unit
>>> tests runs perfectly fine if we run why by one using sbt as the next
>>> example:
>>>
>>> >sbt test-only com.company.pipeline.scalers.ScalerSuite.scala
>>> >sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala
>>>
>>> However, if we try to run them as:
>>>
>>> >sbt test-only com.company.pipeline.*
>>>
>>> we start to run into issues. It appears that the issue is that the hive
>>> context is not properly shutdown after finishing the first test. Does any
>>> one know how to attack this problem? The test part in my build.sbt file
>>> looks like:
>>>
>>> libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.0" %
>>> "test",
>>> parallelExecution in Test := false,
>>> fork := true,
>>> javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M",
>>> "-XX:+CMSClassUnloadingEnabled")
>>>
>>> We are working under Spark 1.3.0
>>>
>>>
>>> Thanks
>>> --
>>> Cesar Flores
>>>
>>
>>
>
>
> --
> Cesar Flores
>



-- 
Cesar Flores


Spark Job Failed (Executor Lost & then FS closed)

2015-08-06 Thread ๏̯͡๏
Code:
import java.text.SimpleDateFormat
import java.util.Calendar
import java.sql.Date
import org.apache.spark.storage.StorageLevel

def extract(array: Array[String], index: Integer) = {
  if (index < array.length) {
array(index).replaceAll("\"", "")
  } else {
""
  }
}


case class GuidSess(
  guid: String,
  sessionKey: String,
  sessionStartDate: String,
  siteId: String,
  eventCount: String,
  browser: String,
  browserVersion: String,
  operatingSystem: String,
  experimentChannel: String,
  deviceName: String)

val rowStructText =
sc.textFile("/user/zeppelin/guidsess/2015/08/05/part-m-1.gz")
val guidSessRDD = rowStructText.filter(s => s.length != 1).map(s =>
s.split(",")).map(
  {
s =>
  GuidSess(extract(s, 0),
extract(s, 1),
extract(s, 2),
extract(s, 3),
extract(s, 4),
extract(s, 5),
extract(s, 6),
extract(s, 7),
extract(s, 8),
extract(s, 9))
  })

val guidSessDF = guidSessRDD.toDF()
guidSessDF.registerTempTable("guidsess")

Once the temp table is created, i wrote this query

select siteid, count(distinct guid) total_visitor,
count(sessionKey) as total_visits
from guidsess
group by siteid

*Metrics:*

Data Size: 170 MB
Spark Version: 1.3.1
YARN: 2.7.x



Timeline:
There is 1 Job, 2 stages with 1 task each.

*1st Stage : mapPartitions*
[image: Inline image 1]

1st Stage: Task 1 started to fail. A second attempt started for 1st task of
first Stage. The first attempt failed "Executor LOST"
when i go to YARN resource manager and go to that particular host, i see
that its running fine.

*Attempt #1*
[image: Inline image 2]

*Attempt #2* Executor LOST AGAIN
[image: Inline image 3]
*Attempt 3&4*

*[image: Inline image 4]*



*2nd Stage runJob : SKIPPED*

*[image: Inline image 5]*

Any suggestions ?


-- 
Deepak


Spark-Grid Engine light integration writeup

2015-08-06 Thread David Chin
Hello, all:

I was able to get Spark 1.4.1 and 1.2.0 standalone to run within a Univa
Grid Engine cluster, with some modification to the appropriate sbin
scripts. My write-up is at:


http://linuxfollies.blogspot.com/2015/08/apache-spark-integration-with-grid.html

I'll be glad to get comments from anyone who may be doing something similar.

Cheers,
Dave

-- 
David Chin, Ph.D.
david.c...@drexel.eduSr. Systems Administrator, URCF, Drexel U.
http://www.drexel.edu/research/urcf/
https://linuxfollies.blogspot.com/
215.221.4747 (mobile)
https://github.com/prehensilecode


Re: Removing empty partitions before we write to HDFS

2015-08-06 Thread Richard Marscher
Not that I'm aware of. We ran into the similar "issue" where we didn't want
to keep accumulating all these empty part files in storage on S3 or HDFS.
There didn't seem to be any performance free way to do it with an RDD, so
we just run a non-spark post-batch operation to delete empty files from the
write path.

On Thu, Aug 6, 2015 at 3:33 PM, Patanachai Tangchaisin 
wrote:

> Currently, I use rdd.isEmpty()
>
> Thanks,
> Patanachai
>
>
>
> On 08/06/2015 12:02 PM, gpatcham wrote:
>
>> Is there a way to filter out empty partitions before I write to HDFS other
>> than using reparition and colasce ?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
> --
> Patanachai
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Re: shutdown local hivecontext?

2015-08-06 Thread Cesar Flores
Well, I try this approach, and still have issues. Apparently TestHive can
not delete the hive metastore directory. The complete error that I have is:

15/08/06 15:01:29 ERROR Driver: FAILED: Execution Error, return code 1 from
org.apache.hadoop.hive.ql.exec.DDLTask.
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.NullPointerException
15/08/06 15:01:29 ERROR TestHive:
==
HIVE FAILURE OUTPUT
==
SET spark.sql.test=
SET
javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945;create=true
SET
hive.metastore.warehouse.dir=C:\cygwin64\tmp\sparkHiveWarehouse5264564710014125096
FAILED: Execution Error, return code 1 from
org.apache.hadoop.hive.ql.exec.DDLTask.
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.NullPointerException

==
END HIVE FAILURE OUTPUT
==

[error] Uncaught exception when running
com.dotomi.pipeline.utilitytransformers.SorterTransformerSuite:
java.lang.ExceptionInInitializerError
[trace] Stack trace suppressed: run last pipeline/test:testOnly for the
full output.
15/08/06 15:01:29 ERROR Utils: Exception while deleting Spark temp dir:
C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945
java.io.IOException: Failed to delete:
C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:932)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:181)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:179)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:179)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177)

Any new idea about how to avoid this error? I think the problem may be
running the tests on sbt, as the created directories are locked until I
exit the sbt command shell from where I run the tests. Please let me know
if you have any other suggestion.


Thanks

On Mon, Aug 3, 2015 at 5:56 PM, Michael Armbrust 
wrote:

> TestHive takes care of creating a temporary directory for each invocation
> so that multiple test runs won't conflict.
>
> On Mon, Aug 3, 2015 at 3:09 PM, Cesar Flores  wrote:
>
>>
>> We are using a local hive context in order to run unit tests. Our unit
>> tests runs perfectly fine if we run why by one using sbt as the next
>> example:
>>
>> >sbt test-only com.company.pipeline.scalers.ScalerSuite.scala
>> >sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala
>>
>> However, if we try to run them as:
>>
>> >sbt test-only com.company.pipeline.*
>>
>> we start to run into issues. It appears that the issue is that the hive
>> context is not properly shutdown after finishing the first test. Does any
>> one know how to attack this problem? The test part in my build.sbt file
>> looks like:
>>
>> libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.0" %
>> "test",
>> parallelExecution in Test := false,
>> fork := true,
>> javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M",
>> "-XX:+CMSClassUnloadingEnabled")
>>
>> We are working under Spark 1.3.0
>>
>>
>> Thanks
>> --
>> Cesar Flores
>>
>
>


-- 
Cesar Flores


log4j custom appender ClassNotFoundException with spark 1.4.1

2015-08-06 Thread mlemay
Hi, I'm trying to use a custom log4j appender in my log4j.properties.
It was perfectly working under spark 1.3.1 but it is now broken.  

The appender is shaded/bundled in my fat-jar.

Note: I've seen that spark 1.3.1 is using a different class loader..
See my SO post:
http://stackoverflow.com/questions/31856532/spark-unable-to-load-custom-log4j-properties-from-fat-jar-resources


log4j: Trying to find [log4j.xml] using context classloader
sun.misc.Launcher$AppClassLoader@4e25154f.
log4j: Trying to find [log4j.xml] using
sun.misc.Launcher$AppClassLoader@4e25154f class loader.
log4j: Trying to find [log4j.xml] using ClassLoader.getSystemResource().
log4j: Trying to find [log4j.properties] using context classloader
sun.misc.Launcher$AppClassLoader@4e25154f.
log4j: Using URL
[file:/C:/Apps/Spark/spark-1.4.1-bin-hadoop2.6/conf/log4j.properties] for
automatic log4j configuration.
log4j: Reading configuration from URL
file:/C:/Apps/Spark/spark-1.4.1-bin-hadoop2.6/conf/log4j.properties
log4j: Parsing for [root] with value=[WARN, console, redis].
log4j: Level token is [WARN].
log4j: Category root set to WARN
log4j: Parsing appender named "console".
log4j: Parsing layout options for "console".
log4j: Setting property [conversionPattern] to [%d{yy/MM/dd HH:mm:ss} %p
%c{1}: %m%n].
log4j: End of parsing for "console".
log4j: Setting property [target] to [System.err].
log4j: Parsed "console" options.
*log4j: Parsing appender named "redis".
log4j:ERROR Could not instantiate class
[com.ryantenney.log4j.FailoverRedisAppender].
java.lang.ClassNotFoundException: com.ryantenney.log4j.FailoverRedisAppender
*at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.log4j.helpers.Loader.loadClass(Loader.java:198)
at
org.apache.log4j.helpers.OptionConverter.instantiateByClassName(OptionConverter.java:327)
at
org.apache.log4j.helpers.OptionConverter.instantiateByKey(OptionConverter.java:124)
at
org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:785)
at
org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at
org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
at
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
at
org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at
org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.(LogManager.java:127)
at
org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285)
at
org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:155)
at
org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:132)
at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:275)
at
org.apache.hadoop.util.ShutdownHookManager.(ShutdownHookManager.java:44)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$18.apply(Utils.scala:2262)
at
org.apache.spark.util.SparkShutdownHookManager$$anonfun$18.apply(Utils.scala:2262)
at scala.util.Try$.apply(Try.scala:161)
at
org.apache.spark.util.SparkShutdownHookManager.install(Utils.scala:2262)
at org.apache.spark.util.Utils$.(Utils.scala:88)
at org.apache.spark.util.Utils$.(Utils.scala)
at
org.apache.spark.deploy.SparkSubmitArguments.handleUnknown(SparkSubmitArguments.scala:432)
at
org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:174)
at
org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:91)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:107)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
log4j:ERROR Could not instantiate appender named "redis".





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/log4j-custom-appender-ClassNotFoundException-with-spark-1-4-1-tp24159.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: log4j.xml bundled in jar vs log4.properties in spark/conf

2015-08-06 Thread mlemay
I'm having the same problem here. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/log4j-xml-bundled-in-jar-vs-log4-properties-in-spark-conf-tp23923p24158.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Removing empty partitions before we write to HDFS

2015-08-06 Thread Patanachai Tangchaisin

Currently, I use rdd.isEmpty()

Thanks,
Patanachai


On 08/06/2015 12:02 PM, gpatcham wrote:

Is there a way to filter out empty partitions before I write to HDFS other
than using reparition and colasce ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Patanachai


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Kinesis Checkpointing/Processing Delay

2015-08-06 Thread Patanachai Tangchaisin

Hi,

I actually run into the same problem although our endpoint is not 
ElasticSearch. When the spark job is dead, we lose some data because 
Kinesis checkpoint is already beyond the last point that spark is processed.


Currently, our workaround is to use spark's checkpoint mechanism with 
write ahead log (WAL)


https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications

Using checkpointing comes with some disadvantage like application code 
is not upgradable, etc.


I believe there is some work to fix this problem like Kafka direct API.
Not sure if this is it : https://issues.apache.org/jira/browse/SPARK-9215

Thanks,
Patanachai


On 08/06/2015 12:08 PM, phibit wrote:

Hi! I'm using Spark + Kinesis ASL to process and persist stream data to
ElasticSearch. For the most part it works nicely.

There is a subtle issue I'm running into about how failures are handled.

For example's sake, let's say I am processing a Kinesis stream that produces
400 records per second, continuously.

Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream
consumer to use "TRIM_HORIZON", to mean "go as far back as possible and
start processing the stream data as quickly as possible, until you catch up
to the tip of the stream".

This means that for some period of time, Spark will suck in data from
Kinesis as quickly as it can, let's say at 5000 records per second.

In my specific case, ElasticSearch can gracefully handle 400 writes per
second, but is NOT happy to process 5000 writes per second. Let's say it
only handles 2000 wps. This means that the processing time will exceed the
batch time, scheduling delay in the stream will rise consistently and
batches of data will get "backlogged" for some period of time.

In normal circumstances, this is fine. When the Spark consumers catch up to
"real-time", the data input rate slows to 400rps and the backlogged batches
eventually get flushed to ES. The system stabilizes.

However! It appears to me that the Kinesis consumer actively submits
checkpoints, even though the records may not have been processed yet (since
they are backlogged). If for some reason there is processing delay and the
Spark process crashes, the checkpoint will have advanced too far. If I
resume the Spark Streaming process, there is essentially a gap in my
ElasticSearch data.

In principle I understand the reason for this, but is there a way to adjust
this behavior? Or is there another way to handle this specific problem?
Ideally I would be able to configure the process to only submit Kinesis
checkpoints only after my data is successfully written to ES.

Thanks,
Phil





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Patanachai


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Kinesis Checkpointing/Processing Delay

2015-08-06 Thread phibit
Hi! I'm using Spark + Kinesis ASL to process and persist stream data to
ElasticSearch. For the most part it works nicely.

There is a subtle issue I'm running into about how failures are handled.

For example's sake, let's say I am processing a Kinesis stream that produces
400 records per second, continuously.

Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream
consumer to use "TRIM_HORIZON", to mean "go as far back as possible and
start processing the stream data as quickly as possible, until you catch up
to the tip of the stream".

This means that for some period of time, Spark will suck in data from
Kinesis as quickly as it can, let's say at 5000 records per second.

In my specific case, ElasticSearch can gracefully handle 400 writes per
second, but is NOT happy to process 5000 writes per second. Let's say it
only handles 2000 wps. This means that the processing time will exceed the
batch time, scheduling delay in the stream will rise consistently and
batches of data will get "backlogged" for some period of time.

In normal circumstances, this is fine. When the Spark consumers catch up to
"real-time", the data input rate slows to 400rps and the backlogged batches
eventually get flushed to ES. The system stabilizes.

However! It appears to me that the Kinesis consumer actively submits
checkpoints, even though the records may not have been processed yet (since
they are backlogged). If for some reason there is processing delay and the
Spark process crashes, the checkpoint will have advanced too far. If I
resume the Spark Streaming process, there is essentially a gap in my
ElasticSearch data.

In principle I understand the reason for this, but is there a way to adjust
this behavior? Or is there another way to handle this specific problem?
Ideally I would be able to configure the process to only submit Kinesis
checkpoints only after my data is successfully written to ES.

Thanks,
Phil





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Removing empty partitions before we write to HDFS

2015-08-06 Thread gpatcham
Is there a way to filter out empty partitions before I write to HDFS other
than using reparition and colasce ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Specifying the role when launching an AWS spark cluster using spark_ec2

2015-08-06 Thread Steve Loughran

There's no support for IAM roles in the s3n:// client code in Apache Hadoop ( 
HADOOP-9384 ); Amazon's modified EMR distro may have it.. 

The s3a filesystem adds it, —this is ready for production use in Hadoop 2.7.1+ 
(implicitly HDP 2.3; CDH 5.4 has cherrypicked the relevant patches.) I don't 
know about the spark_ec2 scripts or what they start

> On 6 Aug 2015, at 10:27, SK  wrote:
> 
> Hi,
> 
> I need to access data on S3 from another account and I have been given the
> IAM role information to access that S3 bucket. From what I understand, AWS
> allows us to attach a role to a resource at the time it is created. However,
> I don't see an option for specifying the role using the spark_ec2.py script. 
> So I created a spark cluster using the default role, but I was not able to
> change its IAM role after creation through AWS console.
> 
> I see a ticket for this issue:
> https://github.com/apache/spark/pull/6962 and the status is closed. 
> 
> If anyone knows how I can specify the role using spark_ec2.py, please let me
> know. I am using spark 1.4.1.
> 
> thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-the-role-when-launching-an-AWS-spark-cluster-using-spark-ec2-tp24154.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error while using ConcurrentHashMap in Spark Streaming

2015-08-06 Thread Ted Yu
bq. aggregationMap.put(countryCode,requestCountPerCountry+1);

If NPE came from the above line, maybe requestCountPerCountry was null ?

Cheers

On Thu, Aug 6, 2015 at 8:54 AM, UMESH CHAUDHARY  wrote:

> Scenario is:
>
>- I have a map of country-code as key and count as value (initially
>count is 0)
>- In DStream.foreachRDD I need to update the count for country in the
>map with new aggregated value
>
> I am doing :
>
> transient Map aggregationMap=new 
> ConcurrentHashMap();
>
>
> Integer requestCountPerCountry=aggregationMap.get(countryCode);
>
> aggregationMap.put(countryCode,requestCountPerCountry+1);   // Getting 
> Following Error in this Line
>
>
> java.lang.NullPointerException
>   at JavaKafkaStream$2.call(JavaKafkaStream.java:107)
>   at JavaKafkaStream$2.call(JavaKafkaStream.java:92)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
>   at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>   at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>   at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>   at scala.util.Try$.apply(Try.scala:161)
>   at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>   at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>  Is this issue related to :
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> If so how can I resolve this?
>
>


Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
With DEBUG, the log output was over 10MB, so I opted for just INFO output.
The (sanitized) log is attached.

The driver is essentially this code:

info("A")

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(s"Init time: ${elapsed} ms")

We've also observed that it is very slow to read the contents of the
parquet files. My colleague wrote a PySpark application that gets the list
of files, parallelizes it, maps across it and reads each file manually
using a C parquet library, and aggregates manually in the loop. Ignoring
the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
query in Scala, his is an order of magnitude faster. Since he is
parallelizing the work through Spark, and that isn't causing any
performance issues, it seems to be a problem with the parquet reader. I may
try to do what he did to construct a DataFrame manually, and see if I can
query it with Spark SQL with reasonable performance.

- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian  wrote:

> Would you mind to provide the driver log?
>
>
> On 8/6/15 3:58 PM, Philip Weaver wrote:
>
> I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
> again.
>
> The initialization time is about 1 minute now, which is still pretty
> terrible.
>
> On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver 
> wrote:
>
>> Absolutely, thanks!
>>
>> On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian < 
>> lian.cs@gmail.com> wrote:
>>
>>> We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396
>>>
>>> Could you give it a shot to see whether it helps in your case? We've
>>> observed ~50x performance boost with schema merging turned on.
>>>
>>> Cheng
>>>
>>>
>>> On 8/6/15 8:26 AM, Philip Weaver wrote:
>>>
>>> I have a parquet directory that was produced by partitioning by two
>>> keys, e.g. like this:
>>>
>>> df.write.partitionBy("a", "b").parquet("asdf")
>>>
>>>
>>> There are 35 values of "a", and about 1100-1200 values of "b" for each
>>> value of "a", for a total of over 40,000 partitions.
>>>
>>> Before running any transformations or actions on the DataFrame, just
>>> initializing it like this takes *2 minutes*:
>>>
>>> val df = sqlContext.read.parquet("asdf")
>>>
>>>
>>> Is this normal? Is this because it is doing some bookeeping to discover
>>> all the partitions? Is it perhaps having to merge the schema from each
>>> partition? Would you expect it to get better or worse if I subpartition by
>>> another key?
>>>
>>> - Philip
>>>
>>>
>>>
>>>
>>
>
>
10:51:42  INFO spark.SparkContext: Running Spark version 1.5.0-SNAPSHOT
10:51:42  WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:51:42  INFO spark.SecurityManager: Changing view acls to: pweaver
10:51:42  INFO spark.SecurityManager: Changing modify acls to: pweaver
10:51:42  INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pweaver); users with modify permissions: Set(pweaver)
10:51:43  INFO slf4j.Slf4jLogger: Slf4jLogger started
10:51:43  INFO Remoting: Starting remoting
10:51:43  INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.26.21.70:51400]
10:51:43  INFO util.Utils: Successfully started service 'sparkDriver' on port 51400.
10:51:43  INFO spark.SparkEnv: Registering MapOutputTracker
10:51:43  INFO spark.SparkEnv: Registering BlockManagerMaster
10:51:43  INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-04438917-93ee-45f3-bc10-c5f5eb3d6a4a
10:51:43  INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
10:51:43  INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858/httpd-50939810-7da7-42d9-9342-48d9dc2705dc
10:51:43  INFO spark.HttpServer: Starting HTTP Server
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55227
10:51:43  INFO util.Utils: Successfully started service 'HTTP file server' on port 55227.
10:51:43  INFO spark.SparkEnv: Registering OutputCommitCoordinator
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
10:51:43  INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
10:51:43  INFO ui.SparkUI: Started SparkUI at http://172.26.21.70:4040
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark-assembly-1.0-deps.jar at http://172.26.21.70:55227/jars/linear_spark-assembly-1.0-deps.jar with timestamp 1438883503937
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark_2.11-1.0.jar at http://172.26.21.70:55227/jars/linear_spark_2.11-1.0.jar with timestamp 1438883503940
10:51:44  WARN metrics.MetricsSystem: Using default name DAGScheduler for source beca

Specifying the role when launching an AWS spark cluster using spark_ec2

2015-08-06 Thread SK
Hi,

I need to access data on S3 from another account and I have been given the
IAM role information to access that S3 bucket. From what I understand, AWS
allows us to attach a role to a resource at the time it is created. However,
I don't see an option for specifying the role using the spark_ec2.py script. 
So I created a spark cluster using the default role, but I was not able to
change its IAM role after creation through AWS console.

I see a ticket for this issue:
https://github.com/apache/spark/pull/6962 and the status is closed. 

If anyone knows how I can specify the role using spark_ec2.py, please let me
know. I am using spark 1.4.1.

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-the-role-when-launching-an-AWS-spark-cluster-using-spark-ec2-tp24154.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR -Graphx

2015-08-06 Thread Shivaram Venkataraman
+Xiangrui

I am not sure exposing the entire GraphX API would make sense as it
contains a lot of low level functions. However we could expose some
high level functions like PageRank etc. Xiangrui, who has been working
on similar techniques to expose MLLib functions like GLM might have
more to add.

Thanks
Shivaram

On Thu, Aug 6, 2015 at 6:21 AM, smagadi  wrote:
> Wanted to use the GRaphX from SparkR , is there a way to do it ?.I think as
> of now it is not possible.I was thinking if one can write a wrapper in R
> that can call Scala Graphx libraries .
> Any thought on this please.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-tp24152.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Execption while using kryo with broadcast

2015-08-06 Thread Shuai Zheng
Hi,

 

I have exactly same issue on Spark 1.4.1 (on EMR latest default AMI 4.0),
run as Yarn client. And after wrapped with another java hashMap, the
exception disappear.

 

But may I know what is right solution? Any JIRA ticket is created for this?
I want to monitor it, even it could be bypass by wrapped with another
hashmap, but it is ugly so I want to remove this kind of code piece later.

 

BTW: interesting things is when I run this in local mode, even with Kyro,
there is no issue (so it passed my unit test and dev test).

 

Regards,

 

Shuai

 

From: Jeetendra Gangele [mailto:gangele...@gmail.com] 
Sent: Wednesday, April 15, 2015 10:59 AM
To: Imran Rashid
Cc: Akhil Das; user
Subject: Re: Execption while using kryo with broadcast

 

This worked with java serialization.I am using 1.2.0 you are right if I use
1.2.1 or 1.3.0 this issue will not occur

I will test this and let you know

 

On 15 April 2015 at 19:48, Imran Rashid  wrote:

oh interesting.  The suggested workaround is to wrap the result from
collectAsMap into another hashmap, you should try that:

Map matchData =RddForMarch.collectAsMap();
Map tmp = new HashMap(matchData);
final Broadcast> dataMatchGlobal =
jsc.broadcast(tmp);

 

Can you please clarify:

* Does it work w/ java serialization in the end?  Or is this kryo only?

* which Spark version you are using? (one of the relevant bugs was fixed in
1.2.1 and 1.3.0)

 

 

 

On Wed, Apr 15, 2015 at 9:06 AM, Jeetendra Gangele 
wrote:

This looks like known issue? check this out

http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassExce
ption-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-co
r-td20034.html

 

Can you please suggest any work around I am broad casting HashMap return
from RDD.collectasMap().

 

On 15 April 2015 at 19:33, Imran Rashid  wrote:

this is a really strange exception ... I'm especially surprised that it
doesn't work w/ java serialization.  Do you think you could try to boil it
down to a minimal example?

 

On Wed, Apr 15, 2015 at 8:58 AM, Jeetendra Gangele 
wrote:

Yes Without Kryo it did work out.when I remove kryo registration it did
worked out

 

On 15 April 2015 at 19:24, Jeetendra Gangele  wrote:

its not working with the combination of Broadcast.

Without Kyro also not working.

 

 

On 15 April 2015 at 19:20, Akhil Das  wrote:

Is it working without kryo?




Thanks

Best Regards

 

On Wed, Apr 15, 2015 at 6:38 PM, Jeetendra Gangele 
wrote:

Hi All I am getting below exception while using Kyro serializable with
broadcast variable. I am broadcating a hasmap with below line.

 

Map matchData =RddForMarch.collectAsMap();

final Broadcast> dataMatchGlobal =
jsc.broadcast(matchData);

 

 

 

 

 

 

15/04/15 12:58:51 ERROR executor.Executor: Exception in task 0.3 in stage
4.0 (TID 7)

java.io.IOException: java.lang.UnsupportedOperationException

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1003)

at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadc
ast.scala:164)

at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadca
st.scala:64)

at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64
)

at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:
87)

at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:103)

at com.insideview.yam.CompanyMatcher$4.call(CompanyMatcher.java:1)

at
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(Jav
aPairRDD.scala:1002)

at
org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(Jav
aPairRDD.scala:1002)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.sca
la:204)

at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scal
a:58)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:56)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
45)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
15)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.UnsupportedOperationException

at java.util.AbstractMap.put(AbstractMap.java:203)

at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:
135)

at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:
17)

at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

at
org.apache.spark.serializer.KryoDeserializationStream.readO

Error while using ConcurrentHashMap in Spark Streaming

2015-08-06 Thread UMESH CHAUDHARY
Scenario is:

   - I have a map of country-code as key and count as value (initially
   count is 0)
   - In DStream.foreachRDD I need to update the count for country in the
   map with new aggregated value

I am doing :

transient Map aggregationMap=new
ConcurrentHashMap();


Integer requestCountPerCountry=aggregationMap.get(countryCode);

aggregationMap.put(countryCode,requestCountPerCountry+1);   // Getting
Following Error in this Line


java.lang.NullPointerException
at JavaKafkaStream$2.call(JavaKafkaStream.java:107)
at JavaKafkaStream$2.call(JavaKafkaStream.java:92)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


 Is this issue related to :

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

If so how can I resolve this?


Terminate streaming app on cluster restart

2015-08-06 Thread Alexander Krasheninnikov

Hello, everyone!
I have a case, when running standalone cluster: on master 
stop-all.sh/star-all.sh are invoked, streaming app loses all it's 
executors, but does not interrupt.
Since it is a streaming app, expected to get it's results ASAP, an 
downtime is undesirable.

Is there any workaround to solve that problem?

Thanks a lot.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Cheng Lian

Would you mind to provide the driver log?

On 8/6/15 3:58 PM, Philip Weaver wrote:
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and 
tried again.


The initialization time is about 1 minute now, which is still pretty 
terrible.


On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver > wrote:


Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian mailto:lian.cs@gmail.com>> wrote:

We've fixed this issue in 1.5
https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your case?
We've observed ~50x performance boost with schema merging
turned on.

Cheng


On 8/6/15 8:26 AM, Philip Weaver wrote:

I have a parquet directory that was produced by partitioning
by two keys, e.g. like this:

df.write.partitionBy("a", "b").parquet("asdf")


There are 35 values of "a", and about 1100-1200 values of "b"
for each value of "a", for a total of over 40,000 partitions.

Before running any transformations or actions on the
DataFrame, just initializing it like this takes *2 minutes*:

val df = sqlContext.read.parquet("asdf")


Is this normal? Is this because it is doing some bookeeping
to discover all the partitions? Is it perhaps having to merge
the schema from each partition? Would you expect it to get
better or worse if I subpartition by another key?

- Philip










How to binarize data in spark

2015-08-06 Thread Adamantios Corais
I have a set of data based on which I want to create a classification
model. Each row has the following form:

user1,class1,product1
> user1,class1,product2
> user1,class1,product5
> user2,class1,product2
> user2,class1,product5
> user3,class2,product1
> etc


There are about 1M users, 2 classes, and 1M products. What I would like to
do next is create the sparse vectors (something already supported by MLlib)
BUT in order to apply that function I have to create the dense vectors
(with the 0s), first. In other words, I have to binarize my data. What's
the easiest (or most elegant) way of doing that?


*// Adamantios*


Temp file missing when training logistic regression

2015-08-06 Thread Cat
Hello, 

I am using the Python API to perform a grid search and train models using
LogisticRegressionWithSGD. 
I am using r3.xl machines in EC2, running on top of YARN in cluster mode. 

The training RDD is persisted in memory and on disk. Some of the models
train successfully, but then at some point during the grid search I get an
error. It looks like the Python broadcast is looking for a part of the RDD
which is no longer there. I scanned the logs for further errors but could
not find anything. 

Any ideas of what could be causing this, and what should I be looking for? 

Many thanks. 
Cat

  model = LogisticRegressionWithSGD.train(the_training, iterations=i,
regParam=c, miniBatchFraction=0.8)
  File "/home/hadoop/spark/python/pyspark/mllib/classification.py", line
164, in train
return _regression_train_wrapper(train, LogisticRegressionModel, data,
initialWeights)
  File "/home/hadoop/spark/python/pyspark/mllib/regression.py", line 140, in
_regression_train_wrapper
weights, intercept = train_func(data,
_convert_to_vector(initial_weights))
  File "/home/hadoop/spark/python/pyspark/mllib/classification.py", line
162, in train
bool(intercept))
  File "/home/hadoop/spark/python/pyspark/mllib/common.py", line 120, in
callMLlibFunc
return callJavaFunc(sc, api, *args)
  File "/home/hadoop/spark/python/pyspark/mllib/common.py", line 113, in
callJavaFunc
return _java2py(sc, func(*args))
  File
"/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
self.target_id, self.name)
  File
"/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling
o271.trainLogisticRegressionModelWithSGD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
serialization failed: java.io.FileNotFoundException:
/mnt/spark/spark-b07b34f8-66c3-43ae-a3ed-0c291724409b/pyspark-4196e8e5-8024-4ec5-a7bb-a60b216e6e74/tmpbCjiSR
(No such file or directory)
java.io.FileInputStream.open(Native Method)
java.io.FileInputStream.(FileInputStream.java:146)
org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply$mcJ$sp(PythonRDD.scala:848)
org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847)
org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847)
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
org.apache.spark.api.python.PythonBroadcast.writeObject(PythonRDD.scala:847)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1176)
org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:79)
org.apache.spark.storage.DiskStore.putArray(DiskStore.scala:64)
org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1028)
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:419)
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:408)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:408)
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:263)
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136)
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:991)
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:84)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
org.apache.spark.SparkContext.broadcast(SparkCo

Re: Upgrade of Spark-Streaming application

2015-08-06 Thread Cody Koeninger
Do the cast to HasOffsetRanges before calling any other methods on the
direct stream.  This is covered in the documentation:

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

If you want to use fromOffsets, you can also just grab the highest
available offsets from Kafka and provide them to the api.  See the
(private) method getLatestLeaderOffsets for an example.

On Wed, Aug 5, 2015 at 10:35 PM, Shushant Arora 
wrote:

> Hi
>
> For checkpointing and using fromOffsets  arguments- Say for the first
> time when my app starts I don't have any prev state stored and I want to
> start consuming from largest offset
>
> 1.  is it possible to specify that in fromOffsets api- I don't want to
> use another api which returs JavaPairInputDStream but fromoffsets api
> returns JavaDStream - since I want to keep further flow of my app same in
> both case.
>
>
> 2. So to achieve first(same flow in both cases) if I  use diff api in 2
> cases and when I transfer JavaPairInputDStream  to JavaDStream  using map
> function , I am no longer able to typecast transferred stream to
> HasOffsetRanges for getting offstes of current run- it throws class cast
> exception -
> when i do
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
> on transformed stream -
>
> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>
>
>
>
> On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger 
> wrote:
>
>> You can't use checkpoints across code upgrades.  That may or may not
>> change in the future, but for now that's a limitation of spark checkpoints
>> (regardless of whether you're using Kafka).
>>
>> Some options:
>>
>> - Start up the new job on a different cluster, then kill the old job once
>> it's caught up to where the new job started.  If you care about duplicate
>> work, you should be doing idempotent / transactional writes anyway, which
>> should take care of the overlap between the two.  If you're doing batches,
>> you may need to be a little more careful about handling batch boundaries
>>
>> - Store the offsets somewhere other than the checkpoint, and provide them
>> on startup using the fromOffsets argument to createDirectStream
>>
>>
>>
>>
>>
>> On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro 
>> wrote:
>>
>>> Hi,
>>> I've read about the recent updates about spark-streaming integration
>>> with Kafka (I refer to the new approach without receivers).
>>> In the new approach, metadata are persisted in checkpoint folders on
>>> HDFS so that the SparkStreaming context can be recreated in case of
>>> failures.
>>> This means that the streaming application will restart from the where it
>>> exited and the message consuming process continues with new messages only.
>>> Also, if I manually stop the streaming process and recreate the context
>>> from checkpoint (using an approach similar to
>>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala),
>>> the behavior would be the same.
>>>
>>> Now, suppose I want to change something in the software and modify the
>>> processing pipeline.
>>> Can spark use the previous checkpoint to recreate the new application?
>>> Will I ever be able to upgrade the software without processing all the
>>> messages in Kafka again?
>>>
>>> Regards,
>>> Nicola
>>>
>>
>>
>


Re: Reliable Streaming Receiver

2015-08-06 Thread Cody Koeninger
You should be able to recompile the streaming-kafka project against 1.2,
let me know if you run into any issues.

>From a usability standpoint, the only relevant thing I can think of that
was added after 1.2 was being able to get the partitionId off of the task
context... you can just use mapPartitionsWithIndex as a workaround

On Wed, Aug 5, 2015 at 8:18 PM, Sourabh Chandak 
wrote:

> Thanks Tathagata. I tried that but BlockGenerator internally uses
> SystemClock which is again private.
>
> We are using DSE so stuck with Spark 1.2 hence can't use the receiver-less
> version. Is it possible to use the same code as a separate API with 1.2?
>
> Thanks,
> Sourabh
>
> On Wed, Aug 5, 2015 at 6:13 PM, Tathagata Das  wrote:
>
>>  You could very easily strip out the BlockGenerator code from the Spark
>> source code and use it directly in the same way the Reliable Kafka Receiver
>> uses it. BTW, you should know that we will be deprecating the receiver
>> based approach for the Direct Kafka approach. That is quite flexible, can
>> give exactly-once guarantee without WAL, and is more robust and performant.
>> Consider using it.
>>
>>
>> On Wed, Aug 5, 2015 at 5:48 PM, Sourabh Chandak 
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to replicate the Kafka Streaming Receiver for a custom
>>> version of Kafka and want to create a Reliable receiver. The current
>>> implementation uses BlockGenerator which is a private class inside Spark
>>> streaming hence I can't use that in my code. Can someone help me with some
>>> resources to tackle this issue?
>>>
>>>
>>>
>>> Thanks,
>>> Sourabh
>>>
>>
>>
>


Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase

2015-08-06 Thread venkatesh b
I'm really sorry, by mistake I posted in spark mailing list.

Jorn Frankie Thanks for your reply.
I have many joins, many complex queries and all are table scans. So I think
HBase do not work for me.

On Thursday, August 6, 2015, Jörn Franke  wrote:

> Additionally it is of key importance to use the right data types for the
> columns. Use int for ids,  int or decimal or float or double etc for
> numeric values etc. - A bad data model using varchars and string where not
> appropriate is a significant bottle neck.
> Furthermore include partition columns in join statements (not where)
> otherwise you do a full table scan ignoring partitions
>
> Le jeu. 6 août 2015 à 15:07, Jörn Franke  > a écrit :
>
>> Yes you should use orc it is much faster and more compact. Additionally
>> you can apply compression (snappy) to increase performance. Your data
>> processing pipeline seems to be not.very optimized. You should use the
>> newest hive version enabling storage indexes and bloom filters on
>> appropriate columns. Ideally you should insert the data sorted
>> appropriately. Partitioning and setting the execution engine to tez is also
>> beneficial.
>>
>> Hbase with phoenix should currently only be used if you do few joins, not
>> very complex queries and not many full table scans.
>>
>> Le jeu. 6 août 2015 à 14:54, venkatesh b > > a
>> écrit :
>>
>>> Hi, here I got two things to know.
>>> FIRST:
>>> In our project we use hive.
>>> We daily get new data. We need to process this new data only once. And
>>> send this processed data to RDBMS. Here in processing we majorly use many
>>> complex queries with joins with where condition and grouping functions.
>>> There are many intermediate tables generated around 50 while
>>> processing. Till now we use text format as storage. We came across ORC file
>>> format. I would like to know that since it is one Time querying the table
>>> is it worth of storing as ORC format.
>>>
>>> SECOND:
>>> I came to know about HBase, which is faster.
>>> Can I replace hive with HBase for processing of data daily faster.
>>> Currently it is taking 15hrs daily with hive.
>>>
>>>
>>> Please inform me if any other information is needed.
>>>
>>> Thanks & regards
>>> Venkatesh
>>>
>>


SparkException: Yarn application has already ended

2015-08-06 Thread Clint McNeil
Hi

I am trying to launch a Spark application on a CM cluster and I get the
following error.

Exception in thread "main" org.apache.spark.SparkException: Yarn
application has already ended! It might have been killed or unable to
launch application master.

at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)

at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)

at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)

at org.apache.spark.SparkContext.(SparkContext.scala:379)

What is the remedy for this type of problem

-- 

*Clint McNeil*

BI & Data Science Engineer | Impact Radius

202 Suntyger, 313 Durban Road, Bellville, 7530

o: +2721 914-1764 <%2B2721%20910-3195> | m: +2782 4796 309 |
cl...@impactradius.com

*Learn more  – Watch our 2 minute overview
*

www.impactradius.com | Twitter  | Facebook
 | LinkedIn
 | YouTube


Maximizing Return on Ad Spend


SparkR -Graphx

2015-08-06 Thread smagadi
Wanted to use the GRaphX from SparkR , is there a way to do it ?.I think as
of now it is not possible.I was thinking if one can write a wrapper in R
that can call Scala Graphx libraries .
Any thought on this please.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-tp24152.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade of Spark-Streaming application

2015-08-06 Thread Shushant Arora
Also Is in  fromoffset api last saved offset is fetched twice ? Is
fromoffset api starts from Map's
Long value or LongValue+1 ? If its from Longvalue - it will be twice - once
it was in last application's run before crash and once after crash in first
run ?

On Thu, Aug 6, 2015 at 9:05 AM, Shushant Arora 
wrote:

> Hi
>
> For checkpointing and using fromOffsets  arguments- Say for the first
> time when my app starts I don't have any prev state stored and I want to
> start consuming from largest offset
>
> 1.  is it possible to specify that in fromOffsets api- I don't want to
> use another api which returs JavaPairInputDStream but fromoffsets api
> returns JavaDStream - since I want to keep further flow of my app same in
> both case.
>
>
> 2. So to achieve first(same flow in both cases) if I  use diff api in 2
> cases and when I transfer JavaPairInputDStream  to JavaDStream  using map
> function , I am no longer able to typecast transferred stream to
> HasOffsetRanges for getting offstes of current run- it throws class cast
> exception -
> when i do
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd.rdd()).offsetRanges();
> on transformed stream -
>
> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>
>
>
>
> On Thu, Jul 30, 2015 at 7:58 PM, Cody Koeninger 
> wrote:
>
>> You can't use checkpoints across code upgrades.  That may or may not
>> change in the future, but for now that's a limitation of spark checkpoints
>> (regardless of whether you're using Kafka).
>>
>> Some options:
>>
>> - Start up the new job on a different cluster, then kill the old job once
>> it's caught up to where the new job started.  If you care about duplicate
>> work, you should be doing idempotent / transactional writes anyway, which
>> should take care of the overlap between the two.  If you're doing batches,
>> you may need to be a little more careful about handling batch boundaries
>>
>> - Store the offsets somewhere other than the checkpoint, and provide them
>> on startup using the fromOffsets argument to createDirectStream
>>
>>
>>
>>
>>
>> On Thu, Jul 30, 2015 at 4:07 AM, Nicola Ferraro 
>> wrote:
>>
>>> Hi,
>>> I've read about the recent updates about spark-streaming integration
>>> with Kafka (I refer to the new approach without receivers).
>>> In the new approach, metadata are persisted in checkpoint folders on
>>> HDFS so that the SparkStreaming context can be recreated in case of
>>> failures.
>>> This means that the streaming application will restart from the where it
>>> exited and the message consuming process continues with new messages only.
>>> Also, if I manually stop the streaming process and recreate the context
>>> from checkpoint (using an approach similar to
>>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala),
>>> the behavior would be the same.
>>>
>>> Now, suppose I want to change something in the software and modify the
>>> processing pipeline.
>>> Can spark use the previous checkpoint to recreate the new application?
>>> Will I ever be able to upgrade the software without processing all the
>>> messages in Kafka again?
>>>
>>> Regards,
>>> Nicola
>>>
>>
>>
>


Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase

2015-08-06 Thread Jörn Franke
Additionally it is of key importance to use the right data types for the
columns. Use int for ids,  int or decimal or float or double etc for
numeric values etc. - A bad data model using varchars and string where not
appropriate is a significant bottle neck.
Furthermore include partition columns in join statements (not where)
otherwise you do a full table scan ignoring partitions

Le jeu. 6 août 2015 à 15:07, Jörn Franke  a écrit :

> Yes you should use orc it is much faster and more compact. Additionally
> you can apply compression (snappy) to increase performance. Your data
> processing pipeline seems to be not.very optimized. You should use the
> newest hive version enabling storage indexes and bloom filters on
> appropriate columns. Ideally you should insert the data sorted
> appropriately. Partitioning and setting the execution engine to tez is also
> beneficial.
>
> Hbase with phoenix should currently only be used if you do few joins, not
> very complex queries and not many full table scans.
>
> Le jeu. 6 août 2015 à 14:54, venkatesh b 
> a écrit :
>
>> Hi, here I got two things to know.
>> FIRST:
>> In our project we use hive.
>> We daily get new data. We need to process this new data only once. And
>> send this processed data to RDBMS. Here in processing we majorly use many
>> complex queries with joins with where condition and grouping functions.
>> There are many intermediate tables generated around 50 while
>> processing. Till now we use text format as storage. We came across ORC file
>> format. I would like to know that since it is one Time querying the table
>> is it worth of storing as ORC format.
>>
>> SECOND:
>> I came to know about HBase, which is faster.
>> Can I replace hive with HBase for processing of data daily faster.
>> Currently it is taking 15hrs daily with hive.
>>
>>
>> Please inform me if any other information is needed.
>>
>> Thanks & regards
>> Venkatesh
>>
>


Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase

2015-08-06 Thread Jörn Franke
Yes you should use orc it is much faster and more compact. Additionally you
can apply compression (snappy) to increase performance. Your data
processing pipeline seems to be not.very optimized. You should use the
newest hive version enabling storage indexes and bloom filters on
appropriate columns. Ideally you should insert the data sorted
appropriately. Partitioning and setting the execution engine to tez is also
beneficial.

Hbase with phoenix should currently only be used if you do few joins, not
very complex queries and not many full table scans.

Le jeu. 6 août 2015 à 14:54, venkatesh b  a
écrit :

> Hi, here I got two things to know.
> FIRST:
> In our project we use hive.
> We daily get new data. We need to process this new data only once. And
> send this processed data to RDBMS. Here in processing we majorly use many
> complex queries with joins with where condition and grouping functions.
> There are many intermediate tables generated around 50 while
> processing. Till now we use text format as storage. We came across ORC file
> format. I would like to know that since it is one Time querying the table
> is it worth of storing as ORC format.
>
> SECOND:
> I came to know about HBase, which is faster.
> Can I replace hive with HBase for processing of data daily faster.
> Currently it is taking 15hrs daily with hive.
>
>
> Please inform me if any other information is needed.
>
> Thanks & regards
> Venkatesh
>


Re: Starting Spark SQL thrift server from within a streaming app

2015-08-06 Thread Todd Nist
Well the creation of a thrift server would be to allow external access to
the data from JDBC / ODBC type connections.  The sparkstreaming-sql
leverages a standard spark sql context and then provides a means of
converting an incoming dstream into a row, look at the MessageToRow trait
in KafkaSource class.

The example, org.apache.spark.sql.streaming.examples.KafkaDDL should make
it clear; I think.

-Todd

On Thu, Aug 6, 2015 at 7:58 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Thank you Todd,
> How is the sparkstreaming-sql project different from starting a thrift
> server on a streaming app ?
>
> Thanks again.
> Daniel
>
>
> On Thu, Aug 6, 2015 at 1:53 AM, Todd Nist  wrote:
>
>> Hi Danniel,
>>
>> It is possible to create an instance of the SparkSQL Thrift server,
>> however seems like this project is what you may be looking for:
>>
>> https://github.com/Intel-bigdata/spark-streamingsql
>>
>> Not 100% sure of your use case is, but you can always convert the data
>> into DF then issue a query against it.  If you want other systems to be
>> able to query it then there are numerous connectors to  store data into
>> Hive, Cassandra, HBase, ElasticSearch, 
>>
>> To create a instance of a thrift server with its own SQL Context you
>> would do something like the following:
>>
>> import org.apache.spark.{SparkConf, SparkContext}
>>
>> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.sql.hive.HiveMetastoreTypes._
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.hive.thriftserver._
>>
>>
>> object MyThriftServer {
>>
>>   val sparkConf = new SparkConf()
>> // master is passed to spark-submit, but could also be specified 
>> explicitely
>> // .setMaster(sparkMaster)
>> .setAppName("My ThriftServer")
>> .set("spark.cores.max", "2")
>>   val sc = new SparkContext(sparkConf)
>>   val  sparkContext  =  sc
>>   import  sparkContext._
>>   val  sqlContext  =  new  HiveContext(sparkContext)
>>   import  sqlContext._
>>   import sqlContext.implicits._
>>
>>   makeRDD((1,"hello") :: (2,"world") 
>> ::Nil).toDF.cache().registerTempTable("t")
>>
>>   HiveThriftServer2.startWithContext(sqlContext)
>> }
>>
>> Again, I'm not really clear what your use case is, but it does sound like
>> the first link above is what you may want.
>>
>> -Todd
>>
>> On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv <
>> daniel.ha...@veracity-group.com> wrote:
>>
>>> Hi,
>>> Is it possible to start the Spark SQL thrift server from with a
>>> streaming app so the streamed data could be queried as it's goes in ?
>>>
>>> Thank you.
>>> Daniel
>>>
>>
>>
>


Is it worth storing in ORC for one time read. And can be replace hive with HBase

2015-08-06 Thread venkatesh b
Hi, here I got two things to know.
FIRST:
In our project we use hive.
We daily get new data. We need to process this new data only once. And send
this processed data to RDBMS. Here in processing we majorly use many
complex queries with joins with where condition and grouping functions.
There are many intermediate tables generated around 50 while
processing. Till now we use text format as storage. We came across ORC file
format. I would like to know that since it is one Time querying the table
is it worth of storing as ORC format.

SECOND:
I came to know about HBase, which is faster.
Can I replace hive with HBase for processing of data daily faster.
Currently it is taking 15hrs daily with hive.


Please inform me if any other information is needed.

Thanks & regards
Venkatesh


Re: Combining Spark Files with saveAsTextFile

2015-08-06 Thread MEETHU MATHEW
Hi,Try using coalesce(1) before calling saveAsTextFile() Thanks & Regards, 
Meethu M 


 On Wednesday, 5 August 2015 7:53 AM, Brandon White 
 wrote:
   

 What is the best way to make saveAsTextFile save as only a single file?

  

Spark-submit not finding main class and the error reflects different path to jar file than specified

2015-08-06 Thread Stephen Boesch
Given the following command line to spark-submit:

bin/spark-submit --verbose --master local[2]--class
org.yardstick.spark.SparkCoreRDDBenchmark
/shared/ysgood/target/yardstick-spark-uber-0.0.1.jar

Here is the output:

NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes
ahead of assembly.
Using properties file: /shared/spark-1.4.1/conf/spark-defaults.conf
Adding default property: spark.akka.askTimeout=180
Adding default property: spark.master=spark://mellyrn.local:7077
Error: Cannot load main class from JAR
file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark
Run with --help for usage help or --verbose for debug output


The path
"file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark"  does
not seem to make sense. It  does not reflect the path to the file that was
specified on the "submit-spark" command line.

Note: when attempting to run that jar file via

java -classpath shared/ysgood/target/yardstick-spark-uber-0.0.1.jar
org.yardstick.spark.SparkCoreRDDBenchmark

Then the result is as expected: the main class starts to load and then
there is a NoClassDefFoundException on the SparkConf.classs (which is not
inside the jar). This shows the app jar is healthy.


Re: Starting Spark SQL thrift server from within a streaming app

2015-08-06 Thread Daniel Haviv
Thank you Todd,
How is the sparkstreaming-sql project different from starting a thrift
server on a streaming app ?

Thanks again.
Daniel


On Thu, Aug 6, 2015 at 1:53 AM, Todd Nist  wrote:

> Hi Danniel,
>
> It is possible to create an instance of the SparkSQL Thrift server,
> however seems like this project is what you may be looking for:
>
> https://github.com/Intel-bigdata/spark-streamingsql
>
> Not 100% sure of your use case is, but you can always convert the data
> into DF then issue a query against it.  If you want other systems to be
> able to query it then there are numerous connectors to  store data into
> Hive, Cassandra, HBase, ElasticSearch, 
>
> To create a instance of a thrift server with its own SQL Context you would
> do something like the following:
>
> import org.apache.spark.{SparkConf, SparkContext}
>
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.HiveMetastoreTypes._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.hive.thriftserver._
>
>
> object MyThriftServer {
>
>   val sparkConf = new SparkConf()
> // master is passed to spark-submit, but could also be specified 
> explicitely
> // .setMaster(sparkMaster)
> .setAppName("My ThriftServer")
> .set("spark.cores.max", "2")
>   val sc = new SparkContext(sparkConf)
>   val  sparkContext  =  sc
>   import  sparkContext._
>   val  sqlContext  =  new  HiveContext(sparkContext)
>   import  sqlContext._
>   import sqlContext.implicits._
>
>   makeRDD((1,"hello") :: (2,"world") 
> ::Nil).toDF.cache().registerTempTable("t")
>
>   HiveThriftServer2.startWithContext(sqlContext)
> }
>
> Again, I'm not really clear what your use case is, but it does sound like
> the first link above is what you may want.
>
> -Todd
>
> On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
>> Hi,
>> Is it possible to start the Spark SQL thrift server from with a streaming
>> app so the streamed data could be queried as it's goes in ?
>>
>> Thank you.
>> Daniel
>>
>
>


Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Todd Nist
They are covered here in the docs:

http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$


On Thu, Aug 6, 2015 at 5:52 AM, Netwaver  wrote:

> Hi All,
>  I am using Spark 1.4.1, and I want to know how can I find the
> complete function list supported in Spark SQL, currently I only know
> 'sum','count','min','max'. Thanks a lot.
>
>
>


Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Ted Yu
Have you looked at this?

http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$



> On Aug 6, 2015, at 2:52 AM, Netwaver  wrote:
> 
> Hi All,
>  I am using Spark 1.4.1, and I want to know how can I find the 
> complete function list supported in Spark SQL, currently I only know 
> 'sum','count','min','max'. Thanks a lot.
> 
> 


Re: Memory allocation error with Spark 1.5

2015-08-06 Thread Alexis Seigneurin
Works like a charm. Thanks Reynold for the quick and efficient response!

Alexis

2015-08-05 19:19 GMT+02:00 Reynold Xin :

> In Spark 1.5, we have a new way to manage memory (part of Project
> Tungsten). The default unit of memory allocation is 64MB, which is way too
> high when you have 1G of memory allocated in total and have more than 4
> threads.
>
> We will reduce the default page size before releasing 1.5.  For now, you
> can just reduce spark.buffer.pageSize variable to a lower value (e.g. 16m).
>
>
> https://github.com/apache/spark/blob/702aa9d7fb16c98a50e046edfd76b8a7861d0391/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala#L125
>
> On Wed, Aug 5, 2015 at 9:25 AM, Alexis Seigneurin 
> wrote:
>
>> Hi,
>>
>> I'm receiving a memory allocation error with a recent build of Spark 1.5:
>>
>> java.io.IOException: Unable to acquire 67108864 bytes of memory
>> at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348)
>> at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398)
>> at
>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92)
>> at
>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
>> at
>> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
>> at
>> org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)
>>
>>
>> The issue appears when joining 2 datasets. One with 6084 records, the
>> other one with 200 records. I'm expecting to receive 200 records in the
>> result.
>>
>> I'm using a homemade build prepared from "branch-1.5" with commit ID
>> "eedb996". I have run "mvn -DskipTests clean install" to generate that
>> build.
>>
>> Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3.
>>
>> I've prepared a test case that can be built and executed very easily
>> (data files are included in the repo):
>> https://github.com/aseigneurin/spark-testcase
>>
>> One thing to note is that the issue arises when the master is set to
>> "local[*]" but not when set to "local". Both options work without problem
>> with Spark 1.4, though.
>>
>> Any help will be greatly appreciated!
>>
>> Many thanks,
>> Alexis
>>
>
>


how to stop twitter-spark streaming

2015-08-06 Thread Sadaf
Hi All,
i am working with spark streaming and twitter's user api.
i used this code to stop streaming

   ssc.addStreamingListener(new StreamingListener{
var count=1
override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted) {
count += 1
if(count>=5)
{
  ssc.stop(true,true)
 }
}
  })
and also override onStop method in custom receiver to stop streaming. but it
gives the following exception

java.lang.NullPointerException: Inflater has been closed
at java.util.zip.Inflater.ensureOpen(Inflater.java:389)
at java.util.zip.Inflater.inflate(Inflater.java:257)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:154)
at java.io.BufferedReader.readLine(BufferedReader.java:317)
at java.io.BufferedReader.readLine(BufferedReader.java:382)
at 
twitter4j.StatusStreamBase.handleNextElement(StatusStreamBase.java:85)
at twitter4j.StatusStreamImpl.next(StatusStreamImpl.java:57)
at
twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:478)
15/08/06 15:50:43 ERROR ReceiverTracker: Deregistered receiver for stream 0:
Stopped by driver
15/08/06 15:50:43 WARN ReceiverSupervisorImpl: Stopped executor without
error
15/08/06 15:50:50 WARN WriteAheadLogManager : Failed to write to write ahead
log

Anyone knows the cause of this exception?

Thanks :)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-stop-twitter-spark-streaming-tp24150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: Real-time data visualization with Zeppelin

2015-08-06 Thread andy petrella
Yep, most of the things will work just by renaming it :-D
You can even use nbconvert afterwards


On Thu, Aug 6, 2015 at 12:09 PM jun  wrote:

> Hi andy,
>
> Is there any method to convert ipython notebook file(.ipynb) to spark
> notebook file(.snb) or vice versa?
>
> BR
> Jun
>
> At 2015-07-13 02:45:57, "andy petrella"  wrote:
>
> Heya,
>
> You might be looking for something like this I guess:
> https://www.youtube.com/watch?v=kB4kRQRFAVc.
>
> The Spark-Notebook (https://github.com/andypetrella/spark-notebook/) can
> bring that to you actually, it uses fully reactive bilateral communication
> streams to update data and viz, plus it hides almost everything for you ^^.
> The video was using the notebook notebooks/streaming/Twitter stream.snb
> 
>  so
> you can play it yourself if you like.
>
> You might want building the master (before 0.6.0 will be released → soon)
> here http://spark-notebook.io/.
>
> HTH
> andy
>
>
>
> On Sun, Jul 12, 2015 at 8:29 PM Ruslan Dautkhanov 
> wrote:
>
>> Don't think it is a Zeppelin problem.. RDDs are "immutable".
>> Unless you integrate something like IndexedRDD
>> http://spark-packages.org/package/amplab/spark-indexedrdd
>> into Zeppelin I think it's not possible.
>>
>>
>> --
>> Ruslan Dautkhanov
>>
>> On Wed, Jul 8, 2015 at 3:24 PM, Brandon White 
>> wrote:
>>
>>> Can you use a con job to update it every X minutes?
>>>
>>> On Wed, Jul 8, 2015 at 2:23 PM, Ganelin, Ilya <
>>> ilya.gane...@capitalone.com> wrote:
>>>
 Hi all – I’m just wondering if anyone has had success integrating Spark
 Streaming with Zeppelin and actually dynamically updating the data in near
 real-time. From my investigation, it seems that Zeppelin will only allow
 you to display a snapshot of data, not a continuously updating table. Has
 anyone figured out if there’s a way to loop a display command or how to
 provide a mechanism to continuously update visualizations?

 Thank you,
 Ilya Ganelin

 [image: 2DD951D6-FF99-4415-80AA-E30EFE7CF452[4].png]

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.

>>>
>>>
>> --
andy


Re: spark job not accepting resources from worker

2015-08-06 Thread Kushal Chokhani
Figured out the root cause. Master was randomly assigning port to worker 
for communication. Because of the firewall on master, worker couldn't 
send out messages to master (maybe like resource details). Weird worker 
didn't even bother to throw any error also.


On 8/6/2015 3:24 PM, Kushal Chokhani wrote:

Any inputs?

In case of following message, is there a way to check which resources 
is not sufficient through some logs?


[Timer-0] WARN org.apache.spark.scheduler.TaskSchedulerImpl  -
Initial job has not accepted any resources; check your cluster UI
to ensure that workers are registered and have sufficient resources

Regards.

On 8/6/2015 11:40 AM, Kushal Chokhani wrote:

Hi

I have a spark/cassandra setup where I am using a spark cassandra 
java connector to query on a table. So far, I have 1 spark master 
node (2 cores) and 1 worker node (4 cores). Both of them have 
following spark-env.sh under conf/:


|#!/usr/bin/env bash
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP="192.168.4.134"
export SPARK_WORKER_MEMORY=1G
export SPARK_EXECUTOR_MEMORY=2G

|

I am using spark 1.4.1 along with cassandra 2.2.0. I have started my 
cassandra/spark setup. Created keyspace and table under cassandra and 
added some rows on table. Now I try to run following spark job using 
spark cassandra java connector:


| SparkConf conf = new SparkConf();
 conf.setAppName("Testing");
 conf.setMaster("spark://192.168.4.134:7077");
 conf.set("spark.cassandra.connection.host", "192.168.4.129");
 conf.set("spark.logConf", "true");
 conf.set("spark.driver.maxResultSize", "50m");
 conf.set("spark.executor.memory", "200m");
 conf.set("spark.eventLog.enabled", "true");
 conf.set("spark.eventLog.dir", "/tmp/");
 conf.set("spark.executor.extraClassPath", "/home/enlighted/ebd.jar");
 conf.set("spark.cores.max", "1");
 JavaSparkContext sc = new JavaSparkContext(conf);


 JavaRDD cassandraRowsRDD = 
CassandraJavaUtil.javaFunctions(sc).cassandraTable("testing", "ec")
 .map(new Function() {
 private static final long serialVersionUID = -6263533266898869895L;
 @Override
 public String call(CassandraRow cassandraRow) throws Exception {
 return cassandraRow.toString();
 }
 });
 System.out.println("Data as CassandraRows: \n" + 
StringUtils.join(cassandraRowsRDD.toArray(), "\n"));
 sc.close();|


This job is stuck with insufficient resources warning. Here are logs:

1107 [main] INFO org.apache.spark.SparkContext  - Spark
configuration:
spark.app.name=Testing
spark.cassandra.connection.host=192.168.4.129
spark.cores.max=1
spark.driver.maxResultSize=50m
spark.eventLog.dir=/tmp/
spark.eventLog.enabled=true
spark.executor.extraClassPath=/home/enlighted/ebd.jar
spark.executor.memory=200m
spark.logConf=true
spark.master=spark://192.168.4.134:7077
1121 [main] INFO org.apache.spark.SecurityManager  - Changing
view acls to: enlighted
1122 [main] INFO org.apache.spark.SecurityManager  - Changing
modify acls to: enlighted
1123 [main] INFO org.apache.spark.SecurityManager  -
SecurityManager: authentication disabled; ui acls disabled; users
with view permissions: Set(enlighted); users with modify
permissions: Set(enlighted)
1767 [sparkDriver-akka.actor.default-dispatcher-4] INFO
akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
1805 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting 
- Starting remoting

1957 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'sparkDriver' on port 54611.
1958 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting 
- Remoting started; listening on addresses

:[akka.tcp://sparkDriver@192.168.4.134:54611]
1977 [main] INFO org.apache.spark.SparkEnv  - Registering
MapOutputTracker
1989 [main] INFO org.apache.spark.SparkEnv  - Registering
BlockManagerMaster
2007 [main] INFO org.apache.spark.storage.DiskBlockManager  -
Created local directory at

/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/blockmgr-3e3d54e7-16df-4e97-be48-b0c0fa0389e7
2012 [main] INFO org.apache.spark.storage.MemoryStore  -
MemoryStore started with capacity 456.0 MB
2044 [main] INFO org.apache.spark.HttpFileServer  - HTTP File
server directory is

/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/httpd-64b4d92e-cde9-45fb-8b38-edc3cca3933c
2046 [main] INFO org.apache.spark.HttpServer  - Starting HTTP Server
2086 [main] INFO org.spark-project.jetty.server.Server  -
jetty-8.y.z-SNAPSHOT
2098 [main] INFO
org.spark-project.jetty.server.AbstractConnector  - Started
SocketConnector@0.0.0.0:44884
2099 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'HTTP file server' on port 44884.
2108 [main] INFO org.apache.spark.SparkEnv  - Registering
OutputCommitCoordinat

Re:Re: Real-time data visualization with Zeppelin

2015-08-06 Thread jun
Hi andy,


Is there any method to convert ipython notebook file(.ipynb) to spark notebook 
file(.snb) or vice versa?


BR
Jun 

At 2015-07-13 02:45:57, "andy petrella"  wrote:

Heya,


You might be looking for something like this I guess: 
https://www.youtube.com/watch?v=kB4kRQRFAVc.


The Spark-Notebook (https://github.com/andypetrella/spark-notebook/) can bring 
that to you actually, it uses fully reactive bilateral communication streams to 
update data and viz, plus it hides almost everything for you ^^. The video was 
using the notebook notebooks/streaming/Twitter stream.snb so you can play it 
yourself if you like.


You might want building the master (before 0.6.0 will be released → soon) here 
http://spark-notebook.io/.


HTH
andy






On Sun, Jul 12, 2015 at 8:29 PM Ruslan Dautkhanov  wrote:

Don't think it is a Zeppelin problem.. RDDs are "immutable".
Unless you integrate something like IndexedRDD 
http://spark-packages.org/package/amplab/spark-indexedrdd
into Zeppelin I think it's not possible.



--
Ruslan Dautkhanov



On Wed, Jul 8, 2015 at 3:24 PM, Brandon White  wrote:

Can you use a con job to update it every X minutes?


On Wed, Jul 8, 2015 at 2:23 PM, Ganelin, Ilya  
wrote:

Hi all – I’m just wondering if anyone has had success integrating Spark 
Streaming with Zeppelin and actually dynamically updating the data in near 
real-time. From my investigation, it seems that Zeppelin will only allow you to 
display a snapshot of data, not a continuously updating table. Has anyone 
figured out if there’s a way to loop a display command or how to provide a 
mechanism to continuously update visualizations? 


Thank you, 
Ilya Ganelin





The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.






Re: spark job not accepting resources from worker

2015-08-06 Thread Kushal Chokhani

Any inputs?

In case of following message, is there a way to check which resources is 
not sufficient through some logs?


   [Timer-0] WARN org.apache.spark.scheduler.TaskSchedulerImpl  -
   Initial job has not accepted any resources; check your cluster UI to
   ensure that workers are registered and have sufficient resources

Regards.

On 8/6/2015 11:40 AM, Kushal Chokhani wrote:

Hi

I have a spark/cassandra setup where I am using a spark cassandra java 
connector to query on a table. So far, I have 1 spark master node (2 
cores) and 1 worker node (4 cores). Both of them have following 
spark-env.sh under conf/:


|#!/usr/bin/env bash
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP="192.168.4.134"
export SPARK_WORKER_MEMORY=1G
export SPARK_EXECUTOR_MEMORY=2G

|

I am using spark 1.4.1 along with cassandra 2.2.0. I have started my 
cassandra/spark setup. Created keyspace and table under cassandra and 
added some rows on table. Now I try to run following spark job using 
spark cassandra java connector:


| SparkConf conf = new SparkConf();
 conf.setAppName("Testing");
 conf.setMaster("spark://192.168.4.134:7077");
 conf.set("spark.cassandra.connection.host", "192.168.4.129");
 conf.set("spark.logConf", "true");
 conf.set("spark.driver.maxResultSize", "50m");
 conf.set("spark.executor.memory", "200m");
 conf.set("spark.eventLog.enabled", "true");
 conf.set("spark.eventLog.dir", "/tmp/");
 conf.set("spark.executor.extraClassPath", "/home/enlighted/ebd.jar");
 conf.set("spark.cores.max", "1");
 JavaSparkContext sc = new JavaSparkContext(conf);


 JavaRDD cassandraRowsRDD = 
CassandraJavaUtil.javaFunctions(sc).cassandraTable("testing", "ec")
 .map(new Function() {
 private static final long serialVersionUID = -6263533266898869895L;
 @Override
 public String call(CassandraRow cassandraRow) throws Exception {
 return cassandraRow.toString();
 }
 });
 System.out.println("Data as CassandraRows: \n" + 
StringUtils.join(cassandraRowsRDD.toArray(), "\n"));
 sc.close();|


This job is stuck with insufficient resources warning. Here are logs:

1107 [main] INFO org.apache.spark.SparkContext  - Spark configuration:
spark.app.name=Testing
spark.cassandra.connection.host=192.168.4.129
spark.cores.max=1
spark.driver.maxResultSize=50m
spark.eventLog.dir=/tmp/
spark.eventLog.enabled=true
spark.executor.extraClassPath=/home/enlighted/ebd.jar
spark.executor.memory=200m
spark.logConf=true
spark.master=spark://192.168.4.134:7077
1121 [main] INFO org.apache.spark.SecurityManager  - Changing view
acls to: enlighted
1122 [main] INFO org.apache.spark.SecurityManager  - Changing
modify acls to: enlighted
1123 [main] INFO org.apache.spark.SecurityManager  -
SecurityManager: authentication disabled; ui acls disabled; users
with view permissions: Set(enlighted); users with modify
permissions: Set(enlighted)
1767 [sparkDriver-akka.actor.default-dispatcher-4] INFO
akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
1805 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting 
- Starting remoting

1957 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'sparkDriver' on port 54611.
1958 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting 
- Remoting started; listening on addresses

:[akka.tcp://sparkDriver@192.168.4.134:54611]
1977 [main] INFO org.apache.spark.SparkEnv  - Registering
MapOutputTracker
1989 [main] INFO org.apache.spark.SparkEnv  - Registering
BlockManagerMaster
2007 [main] INFO org.apache.spark.storage.DiskBlockManager  -
Created local directory at

/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/blockmgr-3e3d54e7-16df-4e97-be48-b0c0fa0389e7
2012 [main] INFO org.apache.spark.storage.MemoryStore  -
MemoryStore started with capacity 456.0 MB
2044 [main] INFO org.apache.spark.HttpFileServer  - HTTP File
server directory is

/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/httpd-64b4d92e-cde9-45fb-8b38-edc3cca3933c
2046 [main] INFO org.apache.spark.HttpServer  - Starting HTTP Server
2086 [main] INFO org.spark-project.jetty.server.Server  -
jetty-8.y.z-SNAPSHOT
2098 [main] INFO org.spark-project.jetty.server.AbstractConnector 
- Started SocketConnector@0.0.0.0:44884

2099 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'HTTP file server' on port 44884.
2108 [main] INFO org.apache.spark.SparkEnv  - Registering
OutputCommitCoordinator
2297 [main] INFO org.spark-project.jetty.server.Server  -
jetty-8.y.z-SNAPSHOT
2317 [main] INFO org.spark-project.jetty.server.AbstractConnector 
- Started SelectChannelConnector@0.0.0.0:4040

2318 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'SparkUI' on port 404

How can I know currently supported functions in Spark SQL

2015-08-06 Thread Netwaver
Hi All,
 I am using Spark 1.4.1, and I want to know how can I find the complete 
function list supported in Spark SQL, currently I only know 
'sum','count','min','max'. Thanks a lot.


Re: Pause Spark Streaming reading or sampling streaming data

2015-08-06 Thread Dimitris Kouzis - Loukas
Re-reading your description - I guess you could potentially make your input
source to connect for 10 seconds, pause for 50 and then reconnect.

On Thu, Aug 6, 2015 at 10:32 AM, Dimitris Kouzis - Loukas  wrote:

> Hi, - yes - it's great that you wrote it yourself - it means you have more
> control. I have the feeling that the most efficient point to discard as
> much data as possible - or even modify your subscription protocol to - your
> spark input source - not even receive the other 50 seconds of data is the
> most efficient point. After you deliver data to DStream - you might filter
> them as much as you want - but you will still be subject to garbage
> collection and/or potential shuffles/and HDD checkpoints.
>
> On Thu, Aug 6, 2015 at 1:31 AM, Heath Guo  wrote:
>
>> Hi Dimitris,
>>
>> Thanks for your reply. Just wondering – are you asking about my streaming
>> input source? I implemented a custom receiver and have been using that.
>> Thanks.
>>
>> From: Dimitris Kouzis - Loukas 
>> Date: Wednesday, August 5, 2015 at 5:27 PM
>> To: Heath Guo 
>> Cc: "user@spark.apache.org" 
>> Subject: Re: Pause Spark Streaming reading or sampling streaming data
>>
>> What driver do you use? Sounds like something you should do before the
>> driver...
>>
>> On Thu, Aug 6, 2015 at 12:50 AM, Heath Guo  wrote:
>>
>>> Hi, I have a question about sampling Spark Streaming data, or getting
>>> part of the data. For every minute, I only want the data read in during the
>>> first 10 seconds, and discard all data in the next 50 seconds. Is there any
>>> way to pause reading and discard data in that period? I'm doing this to
>>> sample from a stream of huge amount of data, which saves processing time in
>>> the real-time program. Thanks!
>>>
>>
>>
>


Re: Pause Spark Streaming reading or sampling streaming data

2015-08-06 Thread Dimitris Kouzis - Loukas
Hi, - yes - it's great that you wrote it yourself - it means you have more
control. I have the feeling that the most efficient point to discard as
much data as possible - or even modify your subscription protocol to - your
spark input source - not even receive the other 50 seconds of data is the
most efficient point. After you deliver data to DStream - you might filter
them as much as you want - but you will still be subject to garbage
collection and/or potential shuffles/and HDD checkpoints.

On Thu, Aug 6, 2015 at 1:31 AM, Heath Guo  wrote:

> Hi Dimitris,
>
> Thanks for your reply. Just wondering – are you asking about my streaming
> input source? I implemented a custom receiver and have been using that.
> Thanks.
>
> From: Dimitris Kouzis - Loukas 
> Date: Wednesday, August 5, 2015 at 5:27 PM
> To: Heath Guo 
> Cc: "user@spark.apache.org" 
> Subject: Re: Pause Spark Streaming reading or sampling streaming data
>
> What driver do you use? Sounds like something you should do before the
> driver...
>
> On Thu, Aug 6, 2015 at 12:50 AM, Heath Guo  wrote:
>
>> Hi, I have a question about sampling Spark Streaming data, or getting
>> part of the data. For every minute, I only want the data read in during the
>> first 10 seconds, and discard all data in the next 50 seconds. Is there any
>> way to pause reading and discard data in that period? I'm doing this to
>> sample from a stream of huge amount of data, which saves processing time in
>> the real-time program. Thanks!
>>
>
>


Re: Is there any way to support multiple users executing SQL on thrift server?

2015-08-06 Thread Ted Yu
What is the JIRA number if a JIRA has been logged for this ?

Thanks



> On Jan 20, 2015, at 11:30 AM, Cheng Lian  wrote:
> 
> Hey Yi,
> 
> I'm quite unfamiliar with Hadoop/HDFS auth mechanisms for now, but would like 
> to investigate this issue later. Would you please open an JIRA for it? Thanks!
> 
> Cheng
> 
>> On 1/19/15 1:00 AM, Yi Tian wrote:
>> Is there any way to support multiple users executing SQL on one thrift 
>> server?
>> 
>> I think there are some problems for spark 1.2.0, for example:
>> 
>> Start thrift server with user A
>> Connect to thrift server via beeline with user B
>> Execute “insert into table dest select … from table src”
>> then we found these items on hdfs:
>> 
>> drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
>> /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1
>> drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
>> /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary
>> drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
>> /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0
>> drwxr-xr-x   - A supergroup  0 2015-01-16 16:42 
>> /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/_temporary
>> drwxr-xr-x   - A supergroup  0 2015-01-16 16:42 
>> /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00
>> -rw-r--r--   3 A supergroup   2671 2015-01-16 16:42 
>> /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00/part-0
>> You can see all the temporary path created on driver side (thrift server 
>> side) is owned by user B (which is what we expected).
>> 
>> But all the output data created on executor side is owned by user A, (which 
>> is NOT what we expected).
>> error owner of the output data cause 
>> org.apache.hadoop.security.AccessControlException while the driver side 
>> moving output data into dest table.
>> 
>> Is anyone know how to resolve this problem?
>> 
> 


Re: Enum values in custom objects mess up RDD operations

2015-08-06 Thread Sebastian Kalix
Thanks a lot Igor,

the following hashCode function is stable:

@Override
public int hashCode() {
int hash = 5;
hash = 41 * hash + this.myEnum.ordinal();
return hash;
}

For anyone having the same problem:
http://tech.technoflirt.com/2014/08/21/issues-with-java-enum-hashcode/


Cheers,

Sebastian

Igor Berman  schrieb am Do., 6. Aug. 2015 um
10:59 Uhr:

> enums hashcode is jvm instance specific(ie. different jvms will give you
> different values), so  you can use ordinal in hashCode computation or use
> hashCode on enums ordinal as part of hashCode computation
>
> On 6 August 2015 at 11:41, Warfish  wrote:
>
>> Hi everyone,
>>
>> I was working with Spark for a little while now and have encountered a
>> very
>> strange behaviour that caused me a lot of headaches:
>>
>> I have written my own POJOs to encapsulate my data and this data is held
>> in
>> some JavaRDDs. Part of these POJOs is a member variable of a custom enum
>> type. Whenever I do some operations on these RDDs such as subtract,
>> groupByKey, reduce or similar things, the results are inconsistent and
>> non-sensical. However, this happens only when the application runs in
>> standalone cluster mode (10 nodes). When running locally on my developer
>> machine, the code executes just fine. If you want to reproduce this
>> behaviour,  here
>> <
>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip
>> >
>> is the complete Maven project that you can run out of the box. I am
>> running
>> Spark 1.4.0 and submitting the application using
>> /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
>> de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar
>>
>>
>>
>> Consider the following code for my custom object:
>>
>>
>> package de.spark.test;
>>
>> import java.io.Serializable;
>> import java.util.Objects;
>>
>> public class MyObject implements Serializable {
>>
>> private MyEnum myEnum;
>>
>> public MyObject(MyEnum myEnum) {
>> this.myEnum = myEnum;
>> }
>>
>> public MyEnum getMyEnum() {
>> return myEnum;
>> }
>>
>> public void setMyEnum(MyEnum myEnum) {
>> this.myEnum = myEnum;
>> }
>>
>> @Override
>> public int hashCode() {
>> int hash = 5;
>> hash = 41 * hash + Objects.hashCode(this.myEnum);
>> return hash;
>> }
>>
>> @Override
>> public boolean equals(Object obj) {
>> if (obj == null) {
>> return false;
>> }
>> if (getClass() != obj.getClass()) {
>> return false;
>> }
>> final MyObject other = (MyObject) obj;
>> if (this.myEnum != other.myEnum) {
>> return false;
>> }
>> return true;
>> }
>>
>> @Override
>> public String toString() {
>> return "MyObject{" + "myEnum=" + myEnum + '}';
>> }
>>
>> }
>>
>>
>> As you can see, I have overriden equals() and hashCode() (both are
>> auto-generated). The enum is given as follows:
>>
>>
>> package de.spark.test;
>>
>> import java.io.Serializable;
>>
>> public enum MyEnum implements Serializable {
>>   VALUE1, VALUE2
>> }
>>
>>
>> The main() method is defined by:
>>
>>
>> package de.spark.test;
>>
>> import java.util.ArrayList;
>> import java.util.List;
>> import org.apache.spark.SparkConf;
>> import org.apache.spark.api.java.JavaRDD;
>> import org.apache.spark.api.java.JavaSparkContext;
>>
>> public class Main {
>>
>>   public static void main(String[] args) {
>> SparkConf conf = new SparkConf().setAppName("Spark Test")
>> .setMaster("myMaster");
>>
>> JavaSparkContext jsc = new JavaSparkContext(conf);
>>
>>
>> System.out.println("///
>> Object generation");
>>
>> List l1 = new ArrayList<>();
>>
>> for(int i = 0; i < 1000; i++) {
>> l1.add(new MyObject(MyEnum.VALUE1));
>> }
>>
>> JavaRDD myObjectRDD1 = jsc.parallelize(l1);
>> JavaRDD myObjectRDD2 = jsc.parallelize(l1);
>>
>> System.out.println("myObjectRDD1 count  = " +
>> myObjectRDD1.count());
>> System.out.println("myObjectRDD2 count  = " +
>> myObjectRDD2.count());
>>
>>
>> System.out.println("///
>> Distinct");
>>
>> JavaRDD myObjectRDD1Distinct = myObjectRDD1.distinct();
>> JavaRDD myObjectRDD2Distinct = myObjectRDD2.distinct();
>>
>> System.out.println("myObjectRDD1Distinct count  = " +
>> myObjectRDD1Distinct.count());
>> System.out.println("myObjectRDD2Distinct count  = " +
>> myObjectRDD2Distinct.count());
>>
>>
>> System.out.println("///
>> Subtract");
>>
>> JavaRDD myObjectRDD1Minus1 =
>> myObjectRDD1.subtract(myObjectRDD1);
>> JavaRDD myObjectRDD1Minus2 =
>> myObjectRDD1.subtract(myObjectRDD2);
>> JavaRDD myObjectRDD2Minus1 =
>> myObjectRDD2.subtract(myObjectRDD1);
>>
>> System.out.println("myObjectRDD

Re: subscribe

2015-08-06 Thread Ted Yu
See http://spark.apache.org/community.html

Cheers



> On Aug 5, 2015, at 10:51 PM, Franc Carter  
> wrote:
> 
> subscribe


Re: Enum values in custom objects mess up RDD operations

2015-08-06 Thread Igor Berman
enums hashcode is jvm instance specific(ie. different jvms will give you
different values), so  you can use ordinal in hashCode computation or use
hashCode on enums ordinal as part of hashCode computation

On 6 August 2015 at 11:41, Warfish  wrote:

> Hi everyone,
>
> I was working with Spark for a little while now and have encountered a very
> strange behaviour that caused me a lot of headaches:
>
> I have written my own POJOs to encapsulate my data and this data is held in
> some JavaRDDs. Part of these POJOs is a member variable of a custom enum
> type. Whenever I do some operations on these RDDs such as subtract,
> groupByKey, reduce or similar things, the results are inconsistent and
> non-sensical. However, this happens only when the application runs in
> standalone cluster mode (10 nodes). When running locally on my developer
> machine, the code executes just fine. If you want to reproduce this
> behaviour,  here
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip
> >
> is the complete Maven project that you can run out of the box. I am running
> Spark 1.4.0 and submitting the application using
> /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
> de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar
>
>
>
> Consider the following code for my custom object:
>
>
> package de.spark.test;
>
> import java.io.Serializable;
> import java.util.Objects;
>
> public class MyObject implements Serializable {
>
> private MyEnum myEnum;
>
> public MyObject(MyEnum myEnum) {
> this.myEnum = myEnum;
> }
>
> public MyEnum getMyEnum() {
> return myEnum;
> }
>
> public void setMyEnum(MyEnum myEnum) {
> this.myEnum = myEnum;
> }
>
> @Override
> public int hashCode() {
> int hash = 5;
> hash = 41 * hash + Objects.hashCode(this.myEnum);
> return hash;
> }
>
> @Override
> public boolean equals(Object obj) {
> if (obj == null) {
> return false;
> }
> if (getClass() != obj.getClass()) {
> return false;
> }
> final MyObject other = (MyObject) obj;
> if (this.myEnum != other.myEnum) {
> return false;
> }
> return true;
> }
>
> @Override
> public String toString() {
> return "MyObject{" + "myEnum=" + myEnum + '}';
> }
>
> }
>
>
> As you can see, I have overriden equals() and hashCode() (both are
> auto-generated). The enum is given as follows:
>
>
> package de.spark.test;
>
> import java.io.Serializable;
>
> public enum MyEnum implements Serializable {
>   VALUE1, VALUE2
> }
>
>
> The main() method is defined by:
>
>
> package de.spark.test;
>
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
>
> public class Main {
>
>   public static void main(String[] args) {
> SparkConf conf = new SparkConf().setAppName("Spark Test")
> .setMaster("myMaster");
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> System.out.println("///
> Object generation");
>
> List l1 = new ArrayList<>();
>
> for(int i = 0; i < 1000; i++) {
> l1.add(new MyObject(MyEnum.VALUE1));
> }
>
> JavaRDD myObjectRDD1 = jsc.parallelize(l1);
> JavaRDD myObjectRDD2 = jsc.parallelize(l1);
>
> System.out.println("myObjectRDD1 count  = " +
> myObjectRDD1.count());
> System.out.println("myObjectRDD2 count  = " +
> myObjectRDD2.count());
>
> System.out.println("///
> Distinct");
>
> JavaRDD myObjectRDD1Distinct = myObjectRDD1.distinct();
> JavaRDD myObjectRDD2Distinct = myObjectRDD2.distinct();
>
> System.out.println("myObjectRDD1Distinct count  = " +
> myObjectRDD1Distinct.count());
> System.out.println("myObjectRDD2Distinct count  = " +
> myObjectRDD2Distinct.count());
>
> System.out.println("///
> Subtract");
>
> JavaRDD myObjectRDD1Minus1 =
> myObjectRDD1.subtract(myObjectRDD1);
> JavaRDD myObjectRDD1Minus2 =
> myObjectRDD1.subtract(myObjectRDD2);
> JavaRDD myObjectRDD2Minus1 =
> myObjectRDD2.subtract(myObjectRDD1);
>
> System.out.println("myObjectRDD1Minus1 count= " +
> myObjectRDD1Minus1.count());
> System.out.println("myObjectRDD1Minus2 count= " +
> myObjectRDD1Minus2.count());
> System.out.println("myObjectRDD2Minus1 count= " +
> myObjectRDD2Minus1.count());
>
> System.out.println("///
> End");
>   }
>
> }
>
>
> Both RDDs contain 1000 exactly equal objects, one would expect each call of
> distinct() to result in 1 and subtract(JavaRDD) to result in
> empty
> RDDs. However here is some sample output:
>
>
> 

Enum values in custom objects mess up RDD operations

2015-08-06 Thread Warfish
Hi everyone,

I was working with Spark for a little while now and have encountered a very
strange behaviour that caused me a lot of headaches:

I have written my own POJOs to encapsulate my data and this data is held in
some JavaRDDs. Part of these POJOs is a member variable of a custom enum
type. Whenever I do some operations on these RDDs such as subtract,
groupByKey, reduce or similar things, the results are inconsistent and
non-sensical. However, this happens only when the application runs in
standalone cluster mode (10 nodes). When running locally on my developer
machine, the code executes just fine. If you want to reproduce this
behaviour,  here
 
 
is the complete Maven project that you can run out of the box. I am running
Spark 1.4.0 and submitting the application using 
/usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar



Consider the following code for my custom object:


package de.spark.test;

import java.io.Serializable;
import java.util.Objects;

public class MyObject implements Serializable {

private MyEnum myEnum;

public MyObject(MyEnum myEnum) {
this.myEnum = myEnum;
}

public MyEnum getMyEnum() {
return myEnum;
}

public void setMyEnum(MyEnum myEnum) {
this.myEnum = myEnum;
}

@Override
public int hashCode() {
int hash = 5;
hash = 41 * hash + Objects.hashCode(this.myEnum);
return hash;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final MyObject other = (MyObject) obj;
if (this.myEnum != other.myEnum) {
return false;
}
return true;
}

@Override
public String toString() {
return "MyObject{" + "myEnum=" + myEnum + '}';
}

}


As you can see, I have overriden equals() and hashCode() (both are
auto-generated). The enum is given as follows:


package de.spark.test;

import java.io.Serializable;

public enum MyEnum implements Serializable {
  VALUE1, VALUE2
}


The main() method is defined by:


package de.spark.test;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class Main {

  public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Spark Test")
.setMaster("myMaster");

JavaSparkContext jsc = new JavaSparkContext(conf);

System.out.println("///
Object generation");

List l1 = new ArrayList<>();

for(int i = 0; i < 1000; i++) {
l1.add(new MyObject(MyEnum.VALUE1));
}

JavaRDD myObjectRDD1 = jsc.parallelize(l1);
JavaRDD myObjectRDD2 = jsc.parallelize(l1);

System.out.println("myObjectRDD1 count  = " +
myObjectRDD1.count());
System.out.println("myObjectRDD2 count  = " +
myObjectRDD2.count());

System.out.println("///
Distinct");

JavaRDD myObjectRDD1Distinct = myObjectRDD1.distinct();
JavaRDD myObjectRDD2Distinct = myObjectRDD2.distinct();

System.out.println("myObjectRDD1Distinct count  = " +
myObjectRDD1Distinct.count());
System.out.println("myObjectRDD2Distinct count  = " +
myObjectRDD2Distinct.count());

System.out.println("///
Subtract");

JavaRDD myObjectRDD1Minus1 =
myObjectRDD1.subtract(myObjectRDD1);
JavaRDD myObjectRDD1Minus2 =
myObjectRDD1.subtract(myObjectRDD2);
JavaRDD myObjectRDD2Minus1 =
myObjectRDD2.subtract(myObjectRDD1);

System.out.println("myObjectRDD1Minus1 count= " +
myObjectRDD1Minus1.count());
System.out.println("myObjectRDD1Minus2 count= " +
myObjectRDD1Minus2.count());
System.out.println("myObjectRDD2Minus1 count= " +
myObjectRDD2Minus1.count());

System.out.println("///
End");
  }
  
}


Both RDDs contain 1000 exactly equal objects, one would expect each call of
distinct() to result in 1 and subtract(JavaRDD) to result in empty
RDDs. However here is some sample output:


/// Object generation
myObjectRDD1 count  = 1000
myObjectRDD2 count  = 1000
/// Distinct
myObjectRDD1Distinct count  = 1
myObjectRDD2Distinct count  = 2
/// Subtract
myObjectRDD1Minus1 count= 500
myObjectRDD1Minus2 count= 0
myObjectRDD2Minus1 count= 0
/// End


And this is a new run, directly foll

Multiple Thrift servers on one Spark cluster

2015-08-06 Thread Bojan Kostic
Hi,

Is there a way to instantiate multiple Thrift servers on one Spark Cluster?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Thrift-servers-on-one-Spark-cluster-tp24148.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
again.

The initialization time is about 1 minute now, which is still pretty
terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver 
wrote:

> Absolutely, thanks!
>
> On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian  wrote:
>
>> We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396
>>
>> Could you give it a shot to see whether it helps in your case? We've
>> observed ~50x performance boost with schema merging turned on.
>>
>> Cheng
>>
>>
>> On 8/6/15 8:26 AM, Philip Weaver wrote:
>>
>> I have a parquet directory that was produced by partitioning by two keys,
>> e.g. like this:
>>
>> df.write.partitionBy("a", "b").parquet("asdf")
>>
>>
>> There are 35 values of "a", and about 1100-1200 values of "b" for each
>> value of "a", for a total of over 40,000 partitions.
>>
>> Before running any transformations or actions on the DataFrame, just
>> initializing it like this takes *2 minutes*:
>>
>> val df = sqlContext.read.parquet("asdf")
>>
>>
>> Is this normal? Is this because it is doing some bookeeping to discover
>> all the partitions? Is it perhaps having to merge the schema from each
>> partition? Would you expect it to get better or worse if I subpartition by
>> another key?
>>
>> - Philip
>>
>>
>>
>>
>


Re: subscribe

2015-08-06 Thread Akhil Das
Welcome aboard!

Thanks
Best Regards

On Thu, Aug 6, 2015 at 11:21 AM, Franc Carter 
wrote:

> subscribe
>


Re: No Twitter Input from Kafka to Spark Streaming

2015-08-06 Thread Akhil Das
You just pasted your twitter credentials, consider changing it. :/

Thanks
Best Regards

On Wed, Aug 5, 2015 at 10:07 PM, narendra  wrote:

> Thanks Akash for the answer. I added endpoint to the listener and now it is
> working.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Aggregate by timestamp from json message

2015-08-06 Thread vchandra
Hi team,
I am very new to SPARK, actually today is my first day. 

I have a nested json string which contains timestamp and lot of
other details. I have json messages from which I need to write multiple
aggregation but for now I need to write one aggregation. If code structure
is already there then kindly post if or give some pointers to start. Quick
inputs will help me lot.
 
Sample Requirement example:


*Requirement: How many Stock dispatched in last 1 hour*

[
   {
  "name":"Stock dispatched",
  *"timestamp":"2015-04-14T10:03:10.000Z",*
  "component":"Work Order",
  "sessionID":"4324--52-3-52-46-3-46-3-75",
  "properties":{
 "Priority":"3",
 "Appliance Manufacturer":"XXX game",
 "Appliance Model":"HJ 10",
 "Appliance Model Year":"2012"
  }
   },
   {
  "name":"Stock dispatched",
 * "timestamp":"2015-04-14T10:04:10.000Z",*
  "component":"Work Order",
  "sessionID":"4324--52-3-52-46-3-46-3-75",
  "properties":{
 "Priority":"3",
 "Appliance Manufacturer":"XXX game",
 "Appliance Model":"DJ 15",
 "Appliance Model Year":"2012"
  }
   }
]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-by-timestamp-from-json-message-tp24147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org