How to restart Twitter spark stream

2015-07-18 Thread Zoran Jeremic
Hi,

I have a twitter spark stream initialized in the following way:

  val ssc:StreamingContext =
> SparkLauncher.getSparkScalaStreamingContext()
>   val config = getTwitterConfigurationBuilder.build()
>   val auth: Option[twitter4j.auth.Authorization] =
> Some(new
> twitter4j.auth.OAuthAuthorization(config))
>   val stream = TwitterUtils.createStream(ssc, auth, filters)
>

This works fine when I initialy start it. However, at some point I need to
update filters since users might add new hashtags they want to follow. I
tried to stop the running stream and spark streaming context without
stoping spark context, e.g:


>stream.stop()
>ssc.stop(false)
>

Afterward, I'm trying to initialize a new Twitter stream like I did
previously. However, I got this exception:

Exception in thread "Firestorm JMX Monitor"
> java.lang.IllegalStateException: Adding new inputs, transformations, and
> output operations after stopping a context is not supported
> at
> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
> at org.apache.spark.streaming.dstream.DStream.(DStream.scala:64)
> at
> org.apache.spark.streaming.dstream.InputDStream.(InputDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.ReceiverInputDStream.(ReceiverInputDStream.scala:41)
> at
> org.apache.spark.streaming.twitter.TwitterInputDStream.(TwitterInputDStream.scala:46)
> at
> org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44)
> at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113)
> at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174)
> at
> org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162)
> at
> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41)
> at
> org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19)
> at java.util.TimerThread.mainLoop(Timer.java:555)
> at java.util.TimerThread.run(Timer.java:505)
>  INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing
> thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater
> has been closed
> ERROR[2015-07-18 22:24:32,503]
> [sparkDriver-akka.actor.default-dispatcher-3]
> streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error
> stopping receiver
> 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116)
>

>

Anybody can explain how to solve this issue?

Thanks,
Zoran


Spark1.4 application throw java.lang.NoClassDefFoundError: javax/servlet/FilterRegistration

2015-07-18 Thread Wwh 吴
hi 
I have build a spark application  with IDEA. when run SparkPI , IDEA throw 
exception as that :
Exception in thread "main" java.lang.NoClassDefFoundError: 
javax/servlet/FilterRegistration at 
org.spark-project.jetty.servlet.ServletContextHandler.(ServletContextHandler.java:136)
 at 
org.spark-project.jetty.servlet.ServletContextHandler.(ServletContextHandler.java:129)
 at 
org.spark-project.jetty.servlet.ServletContextHandler.(ServletContextHandler.java:98)
  at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:108) 
  at org.apache.spark.ui.JettyUtils$.createServletHandler(JettyUtils.scala:99)  
  at org.apache.spark.ui.WebUI.attachPage(WebUI.scala:78) at 
org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62) at 
org.apache.spark.ui.WebUI$$anonfun$attachTab$1.apply(WebUI.scala:62) at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)   at 
org.apache.spark.ui.WebUI.attachTab(WebUI.scala:62)  at 
org.apache.spark.ui.SparkUI.initialize(SparkUI.scala:61) at 
org.apache.spark.ui.SparkUI.(SparkUI.scala:74) at 
org.apache.spark.ui.SparkUI$.create(SparkUI.scala:190)   at 
org.apache.spark.ui.SparkUI$.createLiveUI(SparkUI.scala:141) at 
org.apache.spark.SparkContext.(SparkContext.scala:440) at 
org.learn.SparkPI$.main(SparkPI.scala:27)at 
org.learn.SparkPI.main(SparkPI.scala)at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at 
com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)Caused by: 
java.lang.ClassNotFoundException: javax.servlet.FilterRegistrationat 
java.net.URLClassLoader$1.run(URLClassLoader.java:366)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:355)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:354)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:425)at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)at 
java.lang.ClassLoader.loadClass(ClassLoader.java:358)
And the application SparkPI like this:  def main(args:Array[String]): Unit = {  
  val conf = new SparkConf().setAppName("Spark Pi")
conf.setMaster("local")
val spark = new SparkContext(conf)

//spark.addJar("D:\\BigdataResearch\\SparkLeaning\\out\\artifacts\\sparkleaning_jar\\sparkleaning.jar")
val slices = if (args.length > 0)args(0).toInt else 2
val n = 10 * slices
val count = spark.parallelize(1 to n, slices).map{ i =>
  val x = random * 2 -1
  val y = random * 2 -1
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly" + 4.0 * count / n)
spark.stop()
  }
}

And the build.sbt like this:name := "SparkLearning"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
  "org.apache.hive"% "hive-jdbc" % "0.13.1" ,
   "org.apache.hadoop" % "hadoop-common" % "2.2.0" excludeAll 
ExclusionRule(organization = "javax.servlet"),
  "org.apache.hadoop" % "hadoop-client" % "2.2.0" excludeAll 
ExclusionRule(organization = "javax.servlet"),
  "org.scalatest" %% "scalatest" % "2.2.0" ,
  "org.apache.spark" %% "spark-core" % "1.4.0",
  "org.apache.spark" %% "spark-sql" % "1.4.0",
  "org.apache.spark" %% "spark-hive" % "1.4.0",
  "org.apache.spark" %% "spark-mllib" % "1.4.0",
  "org.apache.spark" %% "spark-streaming" % "1.4.0",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.4.0" ,
  "org.eclipse.jetty"%"jetty-servlet"%"8.1.14.v20131031",
  "org.eclipse.jetty"%"jetty-http"%"8.1.14.v20131031",
  "org.eclipse.jetty"%"jetty-server"%"8.1.14.v20131031",
  "org.eclipse.jetty"%"jetty-util"%"8.1.14.v20131031",
  "org.eclipse.jetty"%"jetty-security"%"8.1.14.v20131031",
  "org.eclipse.jetty"%"jetty-plus"%"8.1.14.v20131031",
  "org.apache.kafka"%%"kafka"%"0.8.2.1",
  "net.sf.json-lib"%"json-lib"%"2.4" from 
"http://gradle.artifactoryonline.com/gradle/libs/net/sf/json-lib/json-lib/2.4/json-lib-2.4-jdk15.jar";,
  "com.databricks"%%"spark-csv"%"1.0.3"
)Please give me some suggestion ! 
  

[General Question] [Hadoop + Spark at scale] Spark Rack Awareness ?

2015-07-18 Thread Mike Frampton
I wanted to ask a general question about Hadoop/Yarn and Apache Spark 
integration. I know that 
Hadoop on a physical cluster has rack awareness. i.e. It attempts to minimise 
network traffic 
by saving replicated blocks within a rack. i.e. 

I wondered whether, when Spark is configured to use Yarn as a cluster manager, 
it is able to 
use this feature to also minimise network traffic to a degree. 

Sorry if this questionn is not quite accurate but I think you can generally see 
what I mean ? 
  

Re: No. of Task vs No. of Executors

2015-07-18 Thread David Mitchell
This is likely due to data skew.  If you are using key-value pairs, one key
has a lot more records, than the other keys.  Do you have any groupBy
operations?

David


On Tue, Jul 14, 2015 at 9:43 AM, shahid  wrote:

> hi
>
> I have a 10 node cluster  i loaded the data onto hdfs, so the no. of
> partitions i get is 9. I am running a spark application , it gets stuck on
> one of tasks, looking at the UI it seems application is not using all nodes
> to do calculations. attached is the screen shot of tasks, it seems tasks
> are
> put on each node more then once. looking at tasks 8 tasks get completed
> under 7-8 minutes and one task takes around 30 minutes so causing the delay
> in results.
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n23824/Screen_Shot_2015-07-13_at_9.png
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
### Confidential e-mail, for recipient's (or recipients') eyes only, not
for distribution. ###


Re: spark-shell with Yarn failed

2015-07-18 Thread Chester @work
it might be a network issue. The error states failed to bind the server IP 
address 

Chester
Sent from my iPhone

> On Jul 18, 2015, at 11:46 AM, Amjad ALSHABANI  wrote:
> 
> Does anybody have any idea about the error I m having.. I am really 
> clueless... And appreciate any idea :)
> 
> Thanks in advance
> 
> Amjad
> 
>> On Jul 17, 2015 5:37 PM, "Amjad ALSHABANI"  wrote:
>> Hello,
>> 
>> First of all I m a newbie in Spark ,
>> 
>> I m trying to start the spark-shell with yarn cluster by running:
>> 
>> $ spark-shell --master yarn-client
>> 
>> Sometimes it goes well, but most of the time I got an error:
>> 
>> Container exited with a non-zero exit code 10
>> Failing this attempt. Failing the application.
>>  ApplicationMaster host: N/A
>>  ApplicationMaster RPC port: -1
>>  queue: default
>>  start time: 1437145851944
>>  final status: FAILED
>>  tracking URL: 
>> http://My-HadoopServer:50080/cluster/app/application_143708028_0030
>>  user: hadoop
>> org.apache.spark.SparkException: Yarn application has already ended! It 
>> might have been killed or unable to launch application master.
>> at 
>> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:115)
>> 
>> 
>> 
>> 
>> searching in the yarn logs I got this log
>> 
>> $ yarn logs -applicationId application_143708028_0030
>> 2015-07-17 17:11:03,961 - INFO  
>> [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3@74]
>>  - Starting remoting
>> 2015-07-17 17:11:04,200 - ERROR 
>> [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1@65]
>>  - failed to bind to My-HadoopServer/10.98.105.11:0, shutting down Netty 
>> transport
>> 2015-07-17 17:11:04,210 - WARN  [main:Logging$class@71] - Service 
>> 'sparkYarnAM' could not bind on port 0. Attempting port 1.
>> ...
>> ...
>> ...
>> 2015-07-17 17:11:05,123 - ERROR [main:Logging$class@96] - Uncaught exception:
>> java.net.BindException: Failed to bind to: My-HadoopServer/HadoopServerIP:0: 
>> Service 'sparkYarnAM' failed after 16 retries!
>> at 
>> org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
>> at 
>> akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
>> at 
>> akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
>> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
>> ...
>> 
>> 
>> 
>> 
>> 
>> I m using Spark 1.3, Hadoop 2.6 ,
>> 
>>  and in spark-env.sh it points to my hadoop configuration:
>> 
>> export HADOOP_CONF_DIR=/usr/hdp/2.2.4.4-16/hadoop/conf
>> 
>> 
>> Is this probleme coming from spark configuration or yarn configuration (or 
>> spark with yarn confs)
>> 
>> Any Ideas??
>> 
>> 
>> 
>> Amjad


Re: spark-shell with Yarn failed

2015-07-18 Thread Amjad ALSHABANI
Does anybody have any idea about the error I m having.. I am really
clueless... And appreciate any idea :)

Thanks in advance

Amjad
On Jul 17, 2015 5:37 PM, "Amjad ALSHABANI"  wrote:

> Hello,
>
> First of all I m a newbie in Spark ,
>
> I m trying to start the spark-shell with yarn cluster by running:
>
> $ spark-shell --master yarn-client
>
> Sometimes it goes well, but most of the time I got an error:
>
> Container exited with a non-zero exit code 10
> Failing this attempt. Failing the application.
>  ApplicationMaster host: N/A
>  ApplicationMaster RPC port: -1
>  queue: default
>  start time: 1437145851944
>  final status: FAILED
>  tracking URL:
> http://My-HadoopServer:50080/cluster/app/application_143708028_0030
>  user: hadoop
> org.apache.spark.SparkException: Yarn application has already ended! It
> might have been killed or unable to launch application master.
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:115)
> 
> 
> 
>
> searching in the yarn logs I got this log
>
> $ yarn logs -applicationId application_143708028_0030
> 2015-07-17 17:11:03,961 - INFO
> [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3@74]
> - Starting remoting
> 2015-07-17 17:11:04,200 - ERROR
> [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1@65]
> - failed to bind to My-HadoopServer/10.98.105.11:0, shutting down Netty
> transport
> 2015-07-17 17:11:04,210 - WARN  [main:Logging$class@71] - Service
> 'sparkYarnAM' could not bind on port 0. Attempting port 1.
> ...
> ...
> ...
> 2015-07-17 17:11:05,123 - ERROR [main:Logging$class@96] - Uncaught
> exception:
> java.net.BindException: Failed to bind to:
> My-HadoopServer/HadoopServerIP:0: Service 'sparkYarnAM' failed after 16
> retries!
> at
> org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
> at
> akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393)
> at
> akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389)
> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
> ...
> 
> 
>
>
>
> I m using Spark 1.3, Hadoop 2.6 ,
>
>  and in spark-env.sh it points to my hadoop configuration:
>
> export HADOOP_CONF_DIR=/usr/hdp/2.2.4.4-16/hadoop/conf
>
>
> Is this probleme coming from spark configuration or yarn configuration (or
> spark with yarn confs)
>
> Any Ideas??
>
>
>
> Amjad
>
>


RE: Feature Generation On Spark

2015-07-18 Thread Mohammed Guller
Try this (replace ... with the appropriate values for your environment):

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector

val sc = new SparkContext(...)
val documents = sc.wholeTextFile(...)
val tokenized = documents.map{ case(path, document) => (path, 
document.split("\\s+"))}
val numFeatures = 10
val hashingTF = new HashingTF(numFeatures)
val featurized = tokenized.map{case(path, words) => (path, 
hashingTF.transform(words))}


Mohammed

From: rishikesh thakur [mailto:rishikeshtha...@hotmail.com]
Sent: Friday, July 17, 2015 12:33 AM
To: Mohammed Guller
Subject: Re: Feature Generation On Spark


Thanks I did look at the example. I am using Spark 1.2. The modules mentioned 
there are not in 1.2 I guess. The import is failing


Rishi


From: Mohammed Guller mailto:moham...@glassbeam.com>>
Sent: Friday, July 10, 2015 2:31 AM
To: rishikesh thakur; ayan guha; Michal Čizmazia
Cc: user
Subject: RE: Feature Generation On Spark


Take a look at the examples here:

https://spark.apache.org/docs/latest/ml-guide.html



Mohammed



From: rishikesh thakur [mailto:rishikeshtha...@hotmail.com]
Sent: Saturday, July 4, 2015 10:49 PM
To: ayan guha; Michal Čizmazia
Cc: user
Subject: RE: Feature Generation On Spark



I have one document per file and each file is to be converted to a feature 
vector. Pretty much like standard feature construction for document 
classification.



Thanks

Rishi



Date: Sun, 5 Jul 2015 01:44:04 +1000
Subject: Re: Feature Generation On Spark
From: guha.a...@gmail.com
To: mici...@gmail.com
CC: rishikeshtha...@hotmail.com; 
user@spark.apache.org

Do you have one document per file or multiple document in the file?

On 4 Jul 2015 23:38, "Michal Čizmazia" 
mailto:mici...@gmail.com>> wrote:

Spark Context has a method wholeTextFiles. Is that what you need?

On 4 July 2015 at 07:04, rishikesh 
mailto:rishikeshtha...@hotmail.com>> wrote:
> Hi
>
> I am new to Spark and am working on document classification. Before model
> fitting I need to do feature generation. Each document is to be converted to
> a feature vector. However I am not sure how to do that. While testing
> locally I have a static list of tokens and when I parse a file I do a lookup
> and increment counters.
>
> In the case of Spark I can create an RDD which loads all the documents
> however I am not sure if one files goes to one executor or multiple. If the
> file is split then the feature vectors needs to be merged. But I am not able
> to figure out how to do that.
>
> Thanks
> Rishi
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: 
> user-unsubscr...@spark.apache.org
> For additional commands, e-mail: 
> user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org


Spark-hive parquet schema evolution

2015-07-18 Thread Jerrick Hoang
Hi all,

I'm aware of the support for schema evolution via DataFrame API. Just
wondering what would be the best way to go about dealing with schema
evolution with Hive metastore tables. So, say I create a table via SparkSQL
CLI, how would I deal with Parquet schema evolution?

Thanks,
J


Re: How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames

2015-07-18 Thread Naveen Madhire
I am facing the same issue, i tried this but getting compilation error for
the "$" in the explode function

So, I had to modify to the below to make it work.

df.select(explode(new Column("entities.user_mentions")).as("mention"))




On Wed, Jun 24, 2015 at 2:48 PM, Michael Armbrust 
wrote:

> Starting in Spark 1.4 there is also an explode that you can use directly
> from the select clause (much like in HiveQL):
>
> import org.apache.spark.sql.functions._
> df.select(explode($"entities.user_mentions").as("mention"))
>
> Unlike standard HiveQL, you can also include other attributes in the
> select or even $"*".
>
>
> On Wed, Jun 24, 2015 at 8:34 AM, Yin Huai  wrote:
>
>> The function accepted by explode is f: Row => TraversableOnce[A]. Seems
>> user_mentions is an array of structs. So, can you change your
>> pattern matching to the following?
>>
>> case Row(rows: Seq[_]) => rows.asInstanceOf[Seq[Row]].map(elem => ...)
>>
>> On Wed, Jun 24, 2015 at 5:27 AM, Gustavo Arjones <
>> garjo...@socialmetrix.com> wrote:
>>
>>> Hi All,
>>>
>>> I am using the new *Apache Spark version 1.4.0 Data-frames API* to
>>> extract information from Twitter's Status JSON, mostly focused on the 
>>> Entities
>>> Object  - the relevant
>>> part to this question is showed below:
>>>
>>> {
>>>   ...
>>>   ...
>>>   "entities": {
>>> "hashtags": [],
>>> "trends": [],
>>> "urls": [],
>>> "user_mentions": [
>>>   {
>>> "screen_name": "linobocchini",
>>> "name": "Lino Bocchini",
>>> "id": 187356243,
>>> "id_str": "187356243",
>>> "indices": [ 3, 16 ]
>>>   },
>>>   {
>>> "screen_name": "jeanwyllys_real",
>>> "name": "Jean Wyllys",
>>> "id": 23176,
>>> "id_str": "23176",
>>> "indices": [ 79, 95 ]
>>>   }
>>> ],
>>> "symbols": []
>>>   },
>>>   ...
>>>   ...
>>> }
>>>
>>> There are several examples on how extract information from primitives
>>> types as string, integer, etc - but I couldn't find anything on how to
>>> process those kind of *complex* structures.
>>>
>>> I tried the code below but it is still doesn't work, it throws an
>>> Exception
>>>
>>> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>
>>> val tweets = sqlContext.read.json("tweets.json")
>>>
>>> // this function is just to filter empty entities.user_mentions[] nodes
>>> // some tweets doesn't contains any mentions
>>> import org.apache.spark.sql.functions.udf
>>> val isEmpty = udf((value: List[Any]) => value.isEmpty)
>>>
>>> import org.apache.spark.sql._
>>> import sqlContext.implicits._
>>> case class UserMention(id: Long, idStr: String, indices: Array[Long], name: 
>>> String, screenName: String)
>>>
>>> val mentions = tweets.select("entities.user_mentions").
>>>   filter(!isEmpty($"user_mentions")).
>>>   explode($"user_mentions") {
>>>   case Row(arr: Array[Row]) => arr.map { elem =>
>>> UserMention(
>>>   elem.getAs[Long]("id"),
>>>   elem.getAs[String]("is_str"),
>>>   elem.getAs[Array[Long]]("indices"),
>>>   elem.getAs[String]("name"),
>>>   elem.getAs[String]("screen_name"))
>>>   }
>>> }
>>>
>>> mentions.first
>>>
>>> Exception when I try to call mentions.first:
>>>
>>> scala> mentions.first
>>> 15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
>>> scala.MatchError: [List([187356243,187356243,List(3, 16),Lino 
>>> Bocchini,linobocchini], [23176,23176,List(79, 95),Jean 
>>> Wyllys,jeanwyllys_real])] (of class 
>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>>> at 
>>> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
>>> at 
>>> $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
>>> at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
>>> at 
>>> org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
>>>
>>> What is wrong here? I understand it is related to the types but I
>>> couldn't figure out it yet.
>>>
>>> As additional context, the structure mapped automatically is:
>>>
>>> scala> mentions.printSchema
>>> root
>>>  |-- user_mentions: array (nullable = true)
>>>  ||-- element: struct (containsNull = true)
>>>  |||-- id: long (nullable = true)
>>>  |||-- id_str: string (nullable = true)
>>>  |||-- indices: array (nullable = true)
>>>  ||||-- element: long (containsNull = true)
>>>  |||-- name: string (nullable = true)
>>>  |||-- screen_name: string (nullable = true)
>>>
>>> *NOTE 1:* I know it is possible to solve this using HiveQL but I would
>>> like to use Data-frames once there is so much momentum around it.
>>>
>>> SELECT explode(entities.user_mentions) as mentions
>>> FROM tweets
>>>
>>> *NOTE 2:* the *UDF* val isEmpty = udf((value: List[Any]) =>
>>> value.isEmpty) is a ugly h

Re: DataFrame more efficient than RDD?

2015-07-18 Thread Ted Yu
Here is a related thread:
http://search-hadoop.com/m/q3RTtPmjSJ1Dod92


> On Jul 15, 2015, at 7:41 AM, k0ala  wrote:
> 
> Hi,
> 
> I have been working a bit with RDD, and am now taking a look at DataFrames.
> The schema definition using case classes looks very attractive;
> 
> https://spark.apache.org/docs/1.4.0/sql-programming-guide.html#inferring-the-schema-using-reflection
> 
>   
> 
> Is a DataFrame more efficient (space-wise) than an RDD for the same case
> class?
> 
> And in general, when should DataFrames be preferred over RDDs, and vice
> versa?
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-more-efficient-than-RDD-tp23857.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


Using Dataframe write with newHdoopApi

2015-07-18 Thread ayan guha
Hi

I am trying to use DF and save it to Elasticsearch using newHadoopApi
(because I am using python). Can anyone guide me to help if this is even
possible?

-- 
Best Regards,
Ayan Guha


BigQuery connector for pyspark via Hadoop Input Format example

2015-07-18 Thread lfiaschi
I have a large dataset stored into a BigQuery table and I would like to load
it into a pypark RDD for ETL data processing.

I realized that BigQuery supports the Hadoop Input / Output format

https://cloud.google.com/hadoop/writing-with-bigquery-connector

and pyspark should be able to use this interface in order to create an RDD
by using the method "newAPIHadoopRDD".

http://spark.apache.org/docs/latest/api/python/pyspark.html

Unfortunately, the documentation on both ends seems scarce and goes beyond
my knowledge of Hadoop/Spark/BigQuery. Is there anybody who has figured out
how to do this?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/BigQuery-connector-for-pyspark-via-Hadoop-Input-Format-example-tp23900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: K Nearest Neighbours

2015-07-18 Thread Gylfi
Hi. 

What I would do in your case would be something like this.. 

Lets call the two datasets, qs and ds, where qs is an array of vectors and
ds is an RDD[(dsID: Long, Vector)]. 

Do the following: 
1) create a k-NN class that can keep track of the k-Nearest Neighbors so
far. It must have a qsID and some structure for the k nearest neighbors
Seq[(dsID:Long, Distance: Long)]  and the function .add( nn : (Long, Vector)
) that will do the distance calc and update the kNN when appropriate.  
2) collect the qs and key-it as well, so each qs has an ID, i.e. qs =
Array[(qsID : Long, Vector)]

Now what you want to do is not create all the distance stuff, but just the
k-NNs. To do this we will actually create a few k-NN for each query vector,
one for each partition, and then merge them later. 

3) do a ds.mapPartition() and inside the function you create a k-NN for the
each qs, scan the ds points of the partition and output an iterator pointing
to the set of k-NNs created. 
val k = 100
val qs = new Array[(KNNClass)]()
val ds = RDD[(Long, Vector)]() 
val knnResults = ds.mapPartitions( itr => {
  val knns = qs.map( qp =>  (qp._1, new KNNClass(k, qp) )
  itr.foreach( dp => {
knns.foreach( knn => knn.add( dp ))
  } )
  knns.iterator
})

Now you have one k-NN per partition for each query point, but this we can
simply fix by doing a reduceByKey and merge all the k-NNs for each qpID into
a single k-NN. 

val knnResultFinal = knnResults.reduceByKey( (a, b) => KNNClass.merge( a, b)
)

Where you have a static function that merges the two k-NNs, i.e. we simply
concatenate them and sort on distance, and then take the k top values and
returns them as a new knn class. 

If you want to control how many k-NNs are create you can always repartition
ds. 

How does that sound? Does this make any sense?   :) 

Regards, 
Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/K-Nearest-Neighbours-tp23759p23899.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Passing Broadcast variable as parameter

2015-07-18 Thread Gylfi
Hi.

You can use a broadcast variable to make data available to all the nodes in
your cluster that can live longer then just the current distributed task. 

For example if you need a to access a large structure in multiple sub-tasks,
instead of sending that structure again and again with each sub-task you can
send it only once and access the data inside the operation (map, flatmap
etc.) by way of the broadcast variable name .value 

See :
https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Note however that you should treat the broadcast variable as a read-only
structure as it is not synced between workers after it is broadcasted.

To broadcast, your data must be serializable.

If the data you are trying to broadcast is a distributed RDD (and thus I
assumably large), perhaps what you need is some form of join operation (or
cogroup)? 

Regards, 
Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-Broadcast-variable-as-parameter-tp23760p23898.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark APIs memory usage?

2015-07-18 Thread Harit Vishwakarma
Even if I remove numpy calls. (no matrices loaded), Same exception is
coming.
Can anyone tell what createDataFrame does internally? Are there any
alternatives for it?

On Fri, Jul 17, 2015 at 6:43 PM, Akhil Das 
wrote:

> I suspect its the numpy filling up Memory.
>
> Thanks
> Best Regards
>
> On Fri, Jul 17, 2015 at 5:46 PM, Harit Vishwakarma <
> harit.vishwaka...@gmail.com> wrote:
>
>> 1. load 3 matrices of size ~ 1 X 1 using numpy.
>> 2. rdd2 = rdd1.values().flatMap( fun )  # rdd1 has roughly 10^7 tuples
>> 3. df = sqlCtx.createDataFrame(rdd2)
>> 4. df.save() # in parquet format
>>
>> It throws exception in createDataFrame() call. I don't know what exactly
>> it is creating ? everything in memory? or can I make it to persist
>> simultaneously while getting created.
>>
>> Thanks
>>
>>
>> On Fri, Jul 17, 2015 at 5:16 PM, Akhil Das 
>> wrote:
>>
>>> Can you paste the code? How much memory does your system have and how
>>> big is your dataset? Did you try df.persist(StorageLevel.MEMORY_AND_DISK)?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Fri, Jul 17, 2015 at 5:14 PM, Harit Vishwakarma <
>>> harit.vishwaka...@gmail.com> wrote:
>>>
 Thanks,
 Code is running on a single machine.
 And it still doesn't answer my question.

 On Fri, Jul 17, 2015 at 4:52 PM, ayan guha  wrote:

> You can bump up number of partitions while creating the rdd you are
> using for df
> On 17 Jul 2015 21:03, "Harit Vishwakarma" 
> wrote:
>
>> Hi,
>>
>> I used createDataFrame API of SqlContext in python. and getting
>> OutOfMemoryException. I am wondering if it is creating whole dataFrame in
>> memory?
>> I did not find any documentation describing memory usage of Spark
>> APIs.
>> Documentation given is nice but little more details (specially on
>> memory usage/ data distribution etc.) will really help.
>>
>> --
>> Regards
>> Harit Vishwakarma
>>
>>


 --
 Regards
 Harit Vishwakarma


>>>
>>
>>
>> --
>> Regards
>> Harit Vishwakarma
>>
>>
>


-- 
Regards
Harit Vishwakarma


PicklingError: Could not pickle object as excessively deep recursion required.

2015-07-18 Thread Andrej Burja
hi

on windows, in local mode, using pyspark i got an error about "excessively
deep recursion"
i'm using some module for lemmatizing/stemming, which uses some dll and
some binary files (module is a python wrapper around c code).
spark version 1.4.0
any idea what is going on?

---
PicklingError Traceback (most recent call last)
 in ()
  1 df1 = df.map(lambda p: lemmatizer.lemmatize('working'))
> 2 df1.take(1)

C:\spark/python\pyspark\rdd.pyc in take(self, num)
   1263
   1264 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1265 res = self.context.runJob(self, takeUpToNumLeft, p,
True)
   1266
   1267 items += res

C:\spark/python\pyspark\context.pyc in runJob(self, rdd, partitionFunc,
partitions, allowLocal)
878 # SparkContext#runJob.
879 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 880 port = self._jvm.PythonRDD.runJob(self._jsc.sc(),
mappedRDD._jrdd, partitions,
881   allowLocal)
882 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))

C:\spark/python\pyspark\rdd.pyc in _jrdd(self)
   2349 command = (self.func, profiler,
self._prev_jrdd_deserializer,
   2350self._jrdd_deserializer)
-> 2351 pickled_cmd, bvars, env, includes =
_prepare_for_python_RDD(self.ctx, command, self)
   2352 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
   2353  bytearray(pickled_cmd),

C:\spark/python\pyspark\rdd.pyc in _prepare_for_python_RDD(sc, command, obj)
   2269 # the serialized command will be compressed by broadcast
   2270 ser = CloudPickleSerializer()
-> 2271 pickled_command = ser.dumps(command)
   2272 if len(pickled_command) > (1 << 20):  # 1M
   2273 # The broadcast will have same life cycle as created
PythonRDD

C:\spark/python\pyspark\serializers.pyc in dumps(self, obj)
425
426 def dumps(self, obj):
--> 427 return cloudpickle.dumps(obj, 2)
428
429

C:\spark/python\pyspark\cloudpickle.pyc in dumps(obj, protocol)
620
621 cp = CloudPickler(file,protocol)
--> 622 cp.dump(obj)
623
624 return file.getvalue()

C:\spark/python\pyspark\cloudpickle.pyc in dump(self, obj)
109 if 'recursion' in e.args[0]:
110 msg = """Could not pickle object as excessively
deep recursion required."""
--> 111 raise pickle.PicklingError(msg)
112
113 def save_memoryview(self, obj):

PicklingError: Could not pickle object as excessively deep recursion
required.


Re: write a HashMap to HDFS in Spark

2015-07-18 Thread Gylfi
Hi. 

Assuming your have the data in an RDD you can save your RDD (regardless of
structure) with "nameRDD".saveAsObjectFile("path")   where "path" can be
"hdfs:///myfolderonHDFS" or the local file system. 

Alternatively you can also use .saveAsTextFile()

Regards, 
Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/write-a-HashMap-to-HDFS-in-Spark-tp23813p23897.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No. of Task vs No. of Executors

2015-07-18 Thread Gylfi
You could even try changing the block size of the input data on HDFS (can be
done on a per file basis) and that would get all workers going right from
the get-go in Spark. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-tp23824p23896.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Create RDD from output of unix command

2015-07-18 Thread Gylfi
You may want to look into using the pipe command .. 
http://blog.madhukaraphatak.com/pipe-in-spark/
http://spark.apache.org/docs/0.6.0/api/core/spark/rdd/PipedRDD.html




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-RDD-from-output-of-unix-command-tp23723p23895.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using reference for RDD is safe?

2015-07-18 Thread Gylfi
Hi. 

"All transformations in Spark are lazy, in that they do not compute their
results right away. Instead, they just remember the transformations applied
to some base dataset (e.g. a file). The transformations are only computed
when an action requires a result to be returned to the driver program. This
design enables Spark to run more efficiently – for example, we can realize
that a dataset created through map will be used in a reduce and return only
the result of the reduce to the driver, rather than the larger mapped
dataset."
See section "RDD Operations" in
https://spark.apache.org/docs/1.2.0/programming-guide.html

Thus, neither your myrdd2 nor myrdd will exist until you call the count. 
What is stored is just "how to create myrdd and myrdd2" so yes, this is
safe.. 

When you run myrdd2.count the both RDDs are created, myrdd2 is counted and
the count printed out.
After the operation both RDDs are "destroyed" again. 
If you run the myrdd2.count again, both myrdd and myrdd2 are created again
.. 

If your transformation is expensive, you may want to keep the data around
and for that must use .persist() or .cache() etc.  

Regards,
   Gylfi. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-reference-for-RDD-is-safe-tp23843p23894.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark same execution time on 1 node and 5 nodes

2015-07-18 Thread Gylfi
Hi. 

If I just look at the two pics, I see that there is only one sub-task that
takes all the time.. 
This is the flatmapToPair at Coef...  line 52.
I also see that there are only two partitions that make up the input and
thus probably only two workers active. 

Try repartitioning the data into more parts before line 52 by calling
"rddname".repartition(10) for example and see if it runs faster.. 

Regards, 
Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-same-execution-time-on-1-node-and-5-nodes-tp23866p23893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and SQL Server

2015-07-18 Thread Davies Liu
I think you have a mistake on call jdbc(), it should be:

jdbc(self, url, table, mode, properties)

You had use properties as the third parameter.

On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T
 wrote:
> Hello,
>
> I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 
> 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a 
> couple of issues writing back. In Scala 2.10 I can write back to the database 
> except for a couple of types.
>
>
> 1.  When I read a DataFrame from a table that contains a datetime column 
> it comes in as a java.sql.Timestamp object in the DataFrame. This is alright 
> for Spark purposes, but when I go to write this back to the database with 
> df.write.jdbc(…) it errors out because it is trying to write the TimeStamp 
> type to SQL Server, which is not a date/time storing type in TSQL. I think it 
> should be writing a datetime, but I’m not sure how to tell Spark this.
>
>
>
> 2.  A related misunderstanding happens when I try to write a 
> java.lang.boolean to the database; it errors out because Spark is trying to 
> specify the width of the bit type, which is illegal in SQL Server (error msg: 
> Cannot specify a column width on data type bit). Do I need to edit Spark 
> source to fix this behavior, or is there a configuration option somewhere 
> that I am not aware of?
>
>
> When I attempt to write back to SQL Server in an IPython notebook, py4j seems 
> unable to convert a Python dict into a Java hashmap, which is necessary for 
> parameter passing. I’ve documented details of this problem with code examples 
> here.
>  Any advice would be appreciated.
>
> Thank you for your time,
>
> -- Matthew Young
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Flatten list

2015-07-18 Thread Gylfi
Hi. 

To be honest I don't really understand your problem declaration :(  but lets
just talk about how .flatmap works. 
Unlike .map(), that only allows a one-to-one transformation, .flatmap()
allows 0, 1 or many outputs per item processed but the output must take the
form of a sequence of the same type, like a /List/ for example. 
All the sequences will then be merged (i.e. flattened) in the end into a
single RDD of that type. 
Note however that an array does not inherit from Sequence and thus you must
transform it to a Sequence or something that inherits from AbstractSeq, like
a List. 
See 
http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.List
 
vs. 
http://www.scala-lang.org/api/current/index.html#scala.Array

For example, lets assume you have an RDD[(Array[Int])] and you want all the
Int values flattened into a single RDD[(Int)]. The code would be something
like so: 

val intArraysRDD : RDD[(Array[Int])] = ..."some code to get array"... 
val flattnedIntRDD : RDD[(Int)] = intArraysRDD.flatmap( array => {
var ret : List[(Int)] = nil 
for ( i <- array) {
ret = i :: ret
}
ret
}) 

This is an intentionally explicit version.. 
A simpler could would be something like this .. 
val flattnedIntRDD : RDD[(Int)] = intArraysRDD.flatmap( array =>
array.toList)

However, to understand exactly your problem you need to explain better what
the RDD you want to create should look like.. 
 
Regards, 
   Gylfi. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Flatten-list-tp23887p23892.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org