RE: Spark streaming on spark-standalone/ yarn inside Spring XD

2015-09-17 Thread Vignesh Radhakrishnan
Okay,  thanks anyways. Will keep looking into it and revert on this forum if I 
come across any solution

From: Tathagata Das [mailto:t...@databricks.com]
Sent: 17 September 2015 02:36
To: Vignesh Radhakrishnan 
Cc: user@spark.apache.org
Subject: Re: Spark streaming on spark-standalone/ yarn inside Spring XD

I am not at all familiar with how SpringXD works so hard to say.

On Wed, Sep 16, 2015 at 12:12 PM, Vignesh Radhakrishnan 
> wrote:

Yes, it is TD. I'm able to run word count etc on spark standalone/ yarn when 
it's not integrated with spring xd.
But the same breaks when used as processor on spring. Was trying to get an 
opinion on whether it's doable or it's something that's not supported at the 
moment
On 16 Sep 2015 23:50, Tathagata Das 
> wrote:
I would check the following.

See if your setup (spark master, etc.) is correct for running simple 
applications in Yarn/Standalone, like the SparkPi example.
If that does not work then the problem is elsewhere. If that works, then the 
problem could be in the Spring XD.

On Wed, Sep 16, 2015 at 5:01 AM, Vignesh Radhakrishnan 
> wrote:
Hi,  I am trying to run a Spark processor on Spring XD for streaming operation.

The spark processor module on Spring XD works when spark is pointing to local. 
The processor fails to run when we point spark to spark standalone (running on 
the same machine) or yarn-client.  Is it possible to run spark processor on 
spark standalone or yarn inside spring XD or is spark local the only option 
here ?

The processor module is:

class WordCount extends Processor[String, (String, Int)] {

  def process(input: ReceiverInputDStream[String]): DStream[(String, Int)] = {
  val words = input.flatMap(_.split(" "))
  val pairs = words.map(word => (word, 1))
  val wordCounts = pairs.reduceByKey(_ + _)
  wordCounts
  }

  @SparkConfig
  def properties : Properties = {
val props = new Properties()
// Any specific Spark configuration properties would go here.
// These properties always get the highest precedence
//props.setProperty("spark.master", "spark://a.b.c.d:7077")
props.setProperty("spark.master", "spark://abcd.hadoop.ambari:7077")
props
  }

}

Below is the error log that I get:

// Error Log

2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'log' for stream 
'spark-streaming-word-count'
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@6dbc4f81 
moduleName = 'log', moduleLabel = 'log', group = 'spark-streaming-word-count', 
sourceChannelName = [null], sinkChannelName = [null], index = 2, type = sink, 
parameters = map[[empty]], children = list[[empty]]]
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Path cache event: 
path=/deployments/modules/allocated/4ff3ba84-e6ca-47dd-894f-aa92bdbb3e06/spark-streaming-word-count.processor.processor.1,
 type=CHILD_ADDED
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module 'processor' for stream 
'spark-streaming-word-count'
2015-09-16T14:28:48+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying module [ModuleDescriptor@5e16dafb 
moduleName = 'scala-word-count', moduleLabel = 'processor', group = 
'spark-streaming-word-count', sourceChannelName = [null], sinkChannelName = 
[null], index = 1, type = processor, parameters = map[[empty]], children = 
list[[empty]]]
2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN DeploymentsPathChildrenCache-0 
util.NativeCodeLoader - Unable to load native-hadoop library for your 
platform... using builtin-java classes where applicable
2015-09-16T14:28:49+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-3 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:09+0530 1.2.0.RELEASE WARN 
sparkDriver-akka.actor.default-dispatcher-4 remote.ReliableDeliverySupervisor - 
Association with remote system [akka.tcp://sparkMaster@abcd.hadoop.ambari:7077] 
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Path cache event: 
path=/deployments/modules/allocated/8d07cdba-557e-458a-9225-b90e5a5778ce/spark-streaming-word-count.source.http.1,
 type=CHILD_ADDED
2015-09-16T14:29:18+0530 1.2.0.RELEASE INFO DeploymentsPathChildrenCache-0 
container.DeploymentListener - Deploying 

RE: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-17 Thread Sun, Rui
The existing algorithms operating on R data.frame can't simply operate on 
SparkR DataFrame. They have to be re-implemented to be based on SparkR 
DataFrame API.

-Original Message-
From: ekraffmiller [mailto:ellen.kraffmil...@gmail.com] 
Sent: Thursday, September 17, 2015 3:30 AM
To: user@spark.apache.org
Subject: SparkR - calling as.vector() with rdd dataframe causes error

Hi,
I have a library of clustering algorithms that I'm trying to run in the SparkR 
interactive shell. (I am working on a proof of concept for a document 
classification tool.) Each algorithm takes a term document matrix in the form 
of a dataframe.  When I pass the method a local dataframe, the clustering 
algorithm works correctly, but when I pass it a spark rdd, it gives an error 
trying to coerce the data into a vector.  Here is the code, that I'm calling 
within SparkR:

# get matrix from a file
file <-
"/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"

#read it into variable
 raw_data <- read.csv(file,sep=',',header=FALSE)

#convert to a local dataframe
localDF = data.frame(raw_data)

# create the rdd
rdd  <- createDataFrame(sqlContext,localDF)

#call the algorithm with the localDF - this works result <- galileo(localDF, 
model='hclust',dist='euclidean',link='ward',K=5)

#call with the rdd - this produces error result <- galileo(rdd, 
model='hclust',dist='euclidean',link='ward',K=5)

Error in as.vector(data) : 
  no method for coercing this S4 class to a vector


I get the same error if I try to directly call as.vector(rdd) as well.

Is there a reason why this works for localDF and not rdd?  Should I be doing 
something else to coerce the object into a vector?

Thanks,
Ellen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Thrift Server JDBC Drivers

2015-09-17 Thread Daniel Haviv
Thank you!

On Wed, Sep 16, 2015 at 10:29 PM, Dan LaBar  wrote:

> I'm running Spark in EMR, and using the JDBC driver provided by AWS
> .
> Don't know if it will work outside of EMR, but it's worth a try.
>
> I've also used the ODBC driver from Hortonworks
> .
>
> Regards,
> Dan
>
> On Wed, Sep 16, 2015 at 8:34 AM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
>> Hi,
>> are there any free JDBC drivers for thrift ?
>> The only ones I could find are Simba's which require a license.
>>
>> Thank,
>> Daniel
>>
>
>


Re: How to recovery DStream from checkpoint directory?

2015-09-17 Thread Akhil Das
Any kind of changes to the jvm classes will make it fail. By checkpointing
the data you mean using checkpoint with updateStateByKey? Here's a similar
discussion happened earlier which will clear your doubts i guess
http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E

Thanks
Best Regards

On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang  wrote:

> And here is another question. If I load the DStream from database every
> time I start the job, will the data be loaded when the job is failed and
> auto restart? If so, both the checkpoint data and database data are loaded,
> won't this a problem?
>
>
>
> Bin Wang 于2015年9月16日周三 下午8:40写道:
>
>> Will StreamingContex.getOrCreate do this work?What kind of code change
>> will make it cannot load?
>>
>> Akhil Das 于2015年9月16日周三 20:20写道:
>>
>>> You can't really recover from checkpoint if you alter the code. A better
>>> approach would be to use some sort of external storage (like a db or
>>> zookeeper etc) to keep the state (the indexes etc) and then when you deploy
>>> new code they can be easily recovered.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:
>>>
 I'd like to know if there is a way to recovery dstream from checkpoint.

 Because I stores state in DStream, I'd like the state to be recovered
 when I restart the application and deploy new code.

>>>
>>>


Re: Support of other languages?

2015-09-17 Thread Rahul Palamuttam
Hi,

Thank you for both responses.
Sun you pointed out the exact issue I was referring to, which is
copying,serializing, deserializing, the byte-array between the JVM heap and
the worker memory.
It also doesn't make sense why the byte-array should be kept on-heap, since
the data of the parent partition is just a byte array that only makes sense
to a python environment.
Shouldn't we be writing the byte-array off-heap and provide supporting
interfaces for outside processes to read and interact with the data?
I'm probably oversimplifying what is really required to do this.

There is a recent JIRA which I thought was interesting with respect to our
discussion.
https://issues.apache.org/jira/browse/SPARK-10399t JIRA

There's also a suggestion, at the bottom of the JIRA, that considers
exposing on-heap memory which is pretty interesting.

- Rahul Palamuttam


On Wed, Sep 9, 2015 at 4:52 AM, Sun, Rui  wrote:

> Hi, Rahul,
>
> To support a new language other than Java/Scala in spark, it is different
> between RDD API and DataFrame API.
>
> For RDD API:
>
> RDD is a distributed collection of the language-specific data types whose
> representation is unknown to JVM. Also transformation functions for RDD are
> written in the language which can't be executed on JVM. That's why worker
> processes of the language runtime are needed in such case. Generally, to
> support RDD API in the language, a subclass of the Scala RDD is needed on
> JVM side (for example, PythonRDD for python, RRDD for R) where compute() is
> overridden to send the serialized parent partition data (yes, what you mean
> data copy happens here) and the serialized transformation function via
> socket to the worker process. The worker process deserializes the partition
> data and the transformation function, then applies the function to the
> data. The result is sent back to JVM via socket after serialization as byte
> array. From JVM's viewpoint, the resulting RDD is a collection of byte
> arrays.
>
> Performance is a concern in such case, as there are overheads, like
> launching of worker processes, serialization/deserialization of partition
> data, bi-directional communication cost of the data.
> Besides, as the JVM can't know the real representation of data in the RDD,
> it is difficult and complex to support shuffle and aggregation operations.
> The Spark Core's built-in aggregator and shuffle can't be utilized
> directly. There should be language specific implementation to support these
> operations, which cause additional overheads.
>
> Additional memory occupation by the worker processes is also a concern.
>
> For DataFrame API:
>
> Things are much simpler than RDD API. For DataFrame, data is read from
> Data Source API and is represented as native objects within the JVM and
> there is no language-specific transformation functions. Basically,
> DataFrame API in the language are just method wrappers to the corresponding
> ones in Scala DataFrame API.
>
> Performance is not a concern. The computation is done on native objects in
> JVM, virtually no performance lost.
>
> The only exception is UDF in DataFrame. The UDF() has to rely on language
> worker processes, similar to RDD API.
>
> -Original Message-
> From: Rahul Palamuttam [mailto:rahulpala...@gmail.com]
> Sent: Tuesday, September 8, 2015 10:54 AM
> To: user@spark.apache.org
> Subject: Support of other languages?
>
> Hi,
> I wanted to know more about how Spark supports R and Python, with respect
> to what gets copied into the language environments.
>
> To clarify :
>
> I know that PySpark utilizes py4j sockets to pass pickled python functions
> between the JVM and the python daemons. However, I wanted to know how it
> passes the data from the JVM into the daemon environment. I assume it has
> to copy the data over into the new environment, since python can't exactly
> operate in JVM heap space, (or can it?).
>
> I had the same question with respect to SparkR, though I'm not completely
> familiar with how they pass around native R code through the worker JVM's.
>
> The primary question I wanted to ask is does Spark make a second copy of
> data, so language-specific daemons can operate on the data? What are some
> of the other limitations encountered when we try to offer multi-language
> support, whether it's in performance or in general software architecture.
> With python in particular the collect operation must be first written to
> disk and then read back from the python driver process.
>
> Would appreciate any insight on this, and if there is any work happening
> in this area.
>
> Thank you,
>
> Rahul Palamuttam
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Support-of-other-languages-tp24599.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 

Input parsing time

2015-09-17 Thread Carlos Eduardo Santos
Hi,

I only loading a JSON and running one query. I would like to know how much
time is spent on reading, decompressing (e.g. bz2 file) and parsing the
file before the query begins to execute. I have the impression that all
processing time (parsing the input and running the query) is included in
the "Executor Computing Time" in History Server.

Do you recommend any documentation to understand better the History Server
logs and maybe more stats included in the log files?

Thanks in advance,
Carlos Eduardo M. Santos
CS PhD student


Re: How to convert dataframe to a nested StructType schema

2015-09-17 Thread Hao Wang
Thanks, Terry. This is exactly what I need :)

Hao

On Tue, Sep 15, 2015 at 8:47 PM, Terry Hole  wrote:

> Hao,
>
> For spark 1.4.1, you can try this:
> val rowrdd = df.rdd.map(r => Row(Row(r(3)), Row(r(0), r(1), r(2
> val newDF = sqlContext.createDataFrame(rowrdd, yourNewSchema)
>
> Thanks!
>
> - Terry
>
> On Wed, Sep 16, 2015 at 2:10 AM, Hao Wang  wrote:
>
>> Hi,
>>
>> I created a dataframe with 4 string columns (city, state, country,
>> zipcode).
>> I then applied the following nested schema to it by creating a custom
>> StructType. When I run df.take(5), it gives the exception below as
>> expected.
>> The question is how I can convert the Rows in the dataframe to conform to
>> this nested schema? Thanks!
>>
>> root
>>  |-- ZipCode: struct (nullable = true)
>>  ||-- zip: string (nullable = true)
>>  |-- Address: struct (nullable = true)
>>  ||-- city: string (nullable = true)
>>  ||-- state: string (nullable = true)
>>  ||-- country: string (nullable = true)
>>
>> [info]   org.apache.spark.SparkException: Job aborted due to stage
>> failure:
>> Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
>> stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
>> java.lang.String)
>> [info] at
>>
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
>> [info] at
>>
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
>> [info] at
>>
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
>> [info] at
>> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>> [info] at
>> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Streaming application code change and stateful transformations

2015-09-17 Thread Adrian Tanase
This section in the streaming guide also outlines a new option – use 2 versions 
in parallel for a period of time, controlling the draining / transition in the 
application level.
http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code

Also – I would not dismiss the graceful shutdown approach, since you’re 
controlling the shutdown.
At a minimum, you can monitor if it was successful and if it failed, you simply 
restart the app, relying on checkpoint recovery before trying again…

I’m copy-pasting more details from an answer I posted earlier to a similar 
question:

  1.  Use 2 versions in parallel, drain the queue up to a point and strat fresh 
in the new version, only processing events from that point forward
 *   Note that “up to a point” is specific to you state management logic, 
it might mean “user sessions stated after 4 am” NOT “events received after 4 am”
  2.  Graceful shutdown and saving data to DB, followed by checkpoint cleanup / 
new checkpoint dir
 *   On restat, you need to use the updateStateByKey that takes an 
initialRdd with the values preloaded from DB
 *   By cleaning the checkpoint in between upgrades, data is loaded only 
once

Hope this helps,
-adrian

From: Ofir Kerker
Date: Wednesday, September 16, 2015 at 6:12 PM
To: Cody Koeninger
Cc: "user@spark.apache.org"
Subject: Re: Spark Streaming application code change and stateful 
transformations

Thanks Cody!
The 2nd solution is safer but seems wasteful :/
I'll try to optimize it by keeping in addition to the 'last-complete-hour' the 
corresponding offsets that bound the incomplete data to try and fast-forward 
only the last couple of hours in the worst case.

On Mon, Sep 14, 2015 at 22:14 Cody Koeninger 
> wrote:
Solution 2 sounds better to me.  You aren't always going to have graceful 
shutdowns.

On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker 
> wrote:
Hi,
My Spark Streaming application consumes messages (events) from Kafka every
10 seconds using the direct stream approach and aggregates these messages
into hourly aggregations (to answer analytics questions like: "How many
users from Paris visited page X between 8PM to 9PM") and save the data to
Cassandra.

I was wondering if there's a good practice for handling a code change in a
Spark Streaming applications that uses stateful transformations
(updateStateByKey for example) because the new application code will not be
able to use the data that was checkpointed by the former application.
I have thought of a few solutions for this issue and was hoping some of you
have some experience with such case and can suggest other solutions or
feedback my suggested solutions:
*Solution #1*: On a graceful shutdown, in addition to the current Kafka
offsets, persist the current aggregated data into Cassandra tables
(different than the regular aggregation tables) that would allow reading
them easily when the new application starts in order to build the initial
state.
*Solution #2*: When an hour is "complete" (i.e not expecting more events
with the timestamp of this hour), update somewhere persistent (DB / shared
file) the last-complete-hour. This will allow me, when the new application
starts, to read all the events from Kafka from the beginning of retention
period (last X hours) and ignore events from timestamp smaller or equal than
the last-complete-hour.

I'll be happy to get your feedback!

Thanks,
Ofir




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Spark Web UI + NGINX

2015-09-17 Thread Renato Perini
Hello!
I'm trying to set up a reverse proxy (using nginx) for the Spark Web UI.
I have 2 machines:
   1) Machine A, with a public IP. This machine will be used to access
Spark Web UI on the Machine B through its private IP address.
   2) Machine B, where Spark is installed (standalone master cluster, 1
worker node and the history server) not accessible from the outside.

Basically I want to access the Spark Web UI through my Machine A using the
URL:
http://machine_A_ip_address/spark

Any advised setup for Spark Web UI + nginx?

Thank you.


Re: Saprk.frame.Akkasize

2015-09-17 Thread Adrian Tanase
Have you reviewed this section of the guide?
http://spark.apache.org/docs/latest/programming-guide.html#shared-variables

If the dataset is static and you need a copy on all the nodes, you should look 
at broadcast variables.

SQL specific, have you tried loading the dataset using the DataFrame API 
directly? It seems to me like you’re using the 2 files as “metadata” instead of 
data…

-adrian

From: Angel Angel
Date: Thursday, September 17, 2015 at 12:28 PM
To: "user@spark.apache.org"
Subject: Saprk.frame.Akkasize

Hi,

I am running some deep learning algorithm on spark.

Example:
https://github.com/deeplearning4j/dl4j-spark-ml-examples


i am trying to run this example in local mode and its working fine.
but when i try to run this example in cluster mode i got following error.

Loaded Mnist dataframe:
15/09/17 18:20:33 WARN TaskSetManager: Stage 0 contains a task of very large 
size (46279 KB). The maximum recommended task size is 100 KB.
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Serialized task 0:0 was 47622358 bytes, which exceeds max 
allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes). 
Consider increasing spark.akka.frameSize or using broadcast variables for large 
values.
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)

Also i have attached the snapshot of error.

And my java driver program is


  public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("spark://hadoopm0:7077").
.setAppName("Mnist Classification Pipeline (Java)");
SparkContext jsc = new SparkContext(conf);
   SQLContext jsql = new SQLContext(jsc);

//String imagesPath = 
"hdfs://hadoopm0:8020/tmp/input1/images-idx1-ubyte";
//String labelsPath = 
"hdfs://hadoopm0:8020/tmp/input1/labels-idx1-ubyte";
String imagesPath = "file:///root/Downloads/Database/images-idx1-ubyte";
String labelsPath = "file:///root/Downloads/Database/labels-idx1-ubyte";
Map params = new HashMap();
params.put("imagesPath", imagesPath);
params.put("labelsPath", labelsPath);
DataFrame data = jsql.read().format(DefaultSource.class.getName())
.options(params).load();

System.out.println("\nLoaded Mnist dataframe:");
data.show(100);






Please give me some reference or suggestions to solve this problem.

Thanks in advance



Re: How to speed up MLlib LDA?

2015-09-17 Thread Marko Asplund
Hi Feynman,

I just tried that, but there wasn't a noticeable change in training
performance. On the other hand model loading time was reduced to ~ 5
seconds from ~ 2 minutes (now persisted as LocalLDAModel).

However, query / prediction time was unchanged.
Unfortunately, this is the critical performance characteristic in our case.

marko


On 15 September 2015 at 19:26, Feynman Liang  wrote:

> Hi Marko,
>
> I haven't looked into your case in much detail but one immediate thought
> is: have you tried the OnlineLDAOptimizer? It's implementation and
> resulting LDA model (LocalLDAModel) is quite different (doesn't depend on
> GraphX, assumes the model fits on a single machine) so you may see
> performance differences.
>
> Feynman
>
>


Re: How to recovery DStream from checkpoint directory?

2015-09-17 Thread Bin Wang
In my understand, here I have only three options to keep the DStream state
between redeploys (yes, I'm using updateStateByKey):

1. Use checkpoint.
2. Use my own database.
3. Use both.

But none of  these options are great:

1. Use checkpoint: I cannot load it after code change. Or I need to keep
the structure of the classes, which seems to be impossible in a developing
project.
2. Use my own database: there may be failure between the program read data
from Kafka and save the DStream to database. So there may have data lose.
3. Use both: Will the data load two times? How can I know in which
situation I should use the which one?

The power of checkpoint seems to be very limited. Is there any plan to
support checkpoint while class is changed, like the discussion you gave me
pointed out?



Akhil Das 于2015年9月17日周四 下午3:26写道:

> Any kind of changes to the jvm classes will make it fail. By checkpointing
> the data you mean using checkpoint with updateStateByKey? Here's a similar
> discussion happened earlier which will clear your doubts i guess
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E
>
> Thanks
> Best Regards
>
> On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang  wrote:
>
>> And here is another question. If I load the DStream from database every
>> time I start the job, will the data be loaded when the job is failed and
>> auto restart? If so, both the checkpoint data and database data are loaded,
>> won't this a problem?
>>
>>
>>
>> Bin Wang 于2015年9月16日周三 下午8:40写道:
>>
>>> Will StreamingContex.getOrCreate do this work?What kind of code change
>>> will make it cannot load?
>>>
>>> Akhil Das 于2015年9月16日周三 20:20写道:
>>>
 You can't really recover from checkpoint if you alter the code. A
 better approach would be to use some sort of external storage (like a db or
 zookeeper etc) to keep the state (the indexes etc) and then when you deploy
 new code they can be easily recovered.

 Thanks
 Best Regards

 On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:

> I'd like to know if there is a way to recovery dstream from checkpoint.
>
> Because I stores state in DStream, I'd like the state to be recovered
> when I restart the application and deploy new code.
>


>


Re: Input parsing time

2015-09-17 Thread Adrian Tanase
You’re right – everything is captured under Executor Computing Time if it’s 
your app code.

I know that some people have used custom builds of spark that add more timers – 
they will show-up nicely in the Spark UI.

A more light-weight approach is to time it yourself via some counters / 
accumulators and log that information.
http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka

-adrian

From: Carlos Eduardo Santos
Reply-To: "c...@cemshost.com.br"
Date: Thursday, September 17, 2015 at 10:09 AM
To: "user@spark.apache.org"
Subject: Input parsing time

Hi,

I only loading a JSON and running one query. I would like to know how much time 
is spent on reading, decompressing (e.g. bz2 file) and parsing the file before 
the query begins to execute. I have the impression that all processing time 
(parsing the input and running the query) is included in the "Executor 
Computing Time" in History Server.

Do you recommend any documentation to understand better the History Server logs 
and maybe more stats included in the log files?

Thanks in advance,
Carlos Eduardo M. Santos
CS PhD student


Saprk.frame.Akkasize

2015-09-17 Thread Angel Angel
Hi,

I am running some deep learning algorithm on spark.

Example:
https://github.com/deeplearning4j/dl4j-spark-ml-examples


i am trying to run this example in local mode and its working fine.
but when i try to run this example in cluster mode i got following error.

Loaded Mnist dataframe:
15/09/17 18:20:33 WARN TaskSetManager: Stage 0 contains a task of very
large size (46279 KB). The maximum recommended task size is 100 KB.
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Serialized task 0:0 was 47622358 bytes, which exceeds max
allowed: spark.akka.frameSize (10485760 bytes) - reserved (204800 bytes).
Consider increasing spark.akka.frameSize or using broadcast variables for
large values.
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)

Also i have attached the snapshot of error.

And my java driver program is


  public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("spark://hadoopm0:7077").
.setAppName("Mnist Classification Pipeline (Java)");
SparkContext jsc = new SparkContext(conf);
   SQLContext jsql = new SQLContext(jsc);

//String imagesPath =
"hdfs://hadoopm0:8020/tmp/input1/images-idx1-ubyte";
//String labelsPath =
"hdfs://hadoopm0:8020/tmp/input1/labels-idx1-ubyte";
String imagesPath =
"file:///root/Downloads/Database/images-idx1-ubyte";
String labelsPath =
"file:///root/Downloads/Database/labels-idx1-ubyte";
Map params = new HashMap();
params.put("imagesPath", imagesPath);
params.put("labelsPath", labelsPath);
DataFrame data = jsql.read().format(DefaultSource.class.getName())
.options(params).load();

System.out.println("\nLoaded Mnist dataframe:");
data.show(100);






Please give me some reference or suggestions to solve this problem.

Thanks in advance

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: spark performance - executor computing time

2015-09-17 Thread Adrian Tanase
Something similar happened to our job as well - spark streaming, YARN deployed 
on AWS.
One of the jobs was consistently taking 10–15X longer one one machine. Same 
data volume, data partitioned really well, etc.

Are you running on AWS or on prem?

We were assuming that one of the VMs in Amazon was flaky and decided to restart 
it, leading to a host of other issues (the executor on it was never recreated 
after the machine joined back in YARN as a healthy node…)

-adrian

From: Robin East
Date: Wednesday, September 16, 2015 at 7:45 PM
To: patcharee
Cc: "user@spark.apache.org"
Subject: Re: spark performance - executor computing time

Is this repeatable? Do you always get one or two executors that are 6 times as 
slow? It could be that some of your tasks have more work to do (maybe you are 
filtering some records out? If it’s always one particular worker node is there 
something about the machine configuration (e.g. CPU speed) that means the 
processing takes longer.

—
Robin East
Spark GraphX in Action Michael S Malak and Robin East
http://www.manning.com/books/spark-graphx-in-action

On 15 Sep 2015, at 12:35, patcharee 
> wrote:

Hi,

I was running a job (on Spark 1.5 + Yarn + java 8). In a stage that lookup 
(org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:873)) 
there was an executor that took the executor computing time > 6 times of 
median. This executor had almost the same shuffle read size and low gc time as 
others.

What can impact the executor computing time? Any suggestions what parameters I 
should monitor/configure?

BR,
Patcharee



-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: How to recovery DStream from checkpoint directory?

2015-09-17 Thread Adrian Tanase
This section in the streaming guide makes your options pretty clear
http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code


  1.  Use 2 versions in parallel, drain the queue up to a point and strat fresh 
in the new version, only processing events from that point forward
 *   Note that “up to a point” is specific to you state management logic, 
it might mean “user sessions stated after 4 am” NOT “events received after 4 am”
  2.  Graceful shutdown and saving data to DB, followed by checkpoint cleanup / 
new checkpoint dir
 *   On restat, you need to use the updateStateByKey that takes an 
initialRdd with the values preloaded from DB
 *   By cleaning the checkpoint in between upgrades, data is loaded only 
once

Hope this helps,
-adrian

From: Bin Wang
Date: Thursday, September 17, 2015 at 11:27 AM
To: Akhil Das
Cc: user
Subject: Re: How to recovery DStream from checkpoint directory?

In my understand, here I have only three options to keep the DStream state 
between redeploys (yes, I'm using updateStateByKey):

1. Use checkpoint.
2. Use my own database.
3. Use both.

But none of  these options are great:

1. Use checkpoint: I cannot load it after code change. Or I need to keep the 
structure of the classes, which seems to be impossible in a developing project.
2. Use my own database: there may be failure between the program read data from 
Kafka and save the DStream to database. So there may have data lose.
3. Use both: Will the data load two times? How can I know in which situation I 
should use the which one?

The power of checkpoint seems to be very limited. Is there any plan to support 
checkpoint while class is changed, like the discussion you gave me pointed out?



Akhil Das 
>于2015年9月17日周四 
下午3:26写道:
Any kind of changes to the jvm classes will make it fail. By checkpointing the 
data you mean using checkpoint with updateStateByKey? Here's a similar 
discussion happened earlier which will clear your doubts i guess 
http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E

Thanks
Best Regards

On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang 
> wrote:
And here is another question. If I load the DStream from database every time I 
start the job, will the data be loaded when the job is failed and auto restart? 
If so, both the checkpoint data and database data are loaded, won't this a 
problem?



Bin Wang >于2015年9月16日周三 下午8:40写道:

Will StreamingContex.getOrCreate do this work?What kind of code change will 
make it cannot load?

Akhil Das 
>于2015年9月16日周三 
20:20写道:
You can't really recover from checkpoint if you alter the code. A better 
approach would be to use some sort of external storage (like a db or zookeeper 
etc) to keep the state (the indexes etc) and then when you deploy new code they 
can be easily recovered.

Thanks
Best Regards

On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang 
> wrote:
I'd like to know if there is a way to recovery dstream from checkpoint.

Because I stores state in DStream, I'd like the state to be recovered when I 
restart the application and deploy new code.




Re: How to recovery DStream from checkpoint directory?

2015-09-17 Thread Bin Wang
Thanks Adrian, the hint of use updateStateByKey with initialRdd helps a lot!

Adrian Tanase 于2015年9月17日周四 下午4:50写道:

> This section in the streaming guide makes your options pretty clear
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code
>
>
>1. Use 2 versions in parallel, drain the queue up to a point and strat
>fresh in the new version, only processing events from that point forward
>   1. Note that “up to a point” is specific to you state management
>   logic, it might mean “user sessions stated after 4 am” NOT “events 
> received
>   after 4 am”
>2. Graceful shutdown and saving data to DB, followed by checkpoint
>cleanup / new checkpoint dir
>   1. On restat, you need to use the updateStateByKey that takes an
>   initialRdd with the values preloaded from DB
>   2. By cleaning the checkpoint in between upgrades, data is loaded
>   only once
>
> Hope this helps,
> -adrian
>
> From: Bin Wang
> Date: Thursday, September 17, 2015 at 11:27 AM
> To: Akhil Das
> Cc: user
> Subject: Re: How to recovery DStream from checkpoint directory?
>
> In my understand, here I have only three options to keep the DStream state
> between redeploys (yes, I'm using updateStateByKey):
>
> 1. Use checkpoint.
> 2. Use my own database.
> 3. Use both.
>
> But none of  these options are great:
>
> 1. Use checkpoint: I cannot load it after code change. Or I need to keep
> the structure of the classes, which seems to be impossible in a developing
> project.
> 2. Use my own database: there may be failure between the program read data
> from Kafka and save the DStream to database. So there may have data lose.
> 3. Use both: Will the data load two times? How can I know in which
> situation I should use the which one?
>
> The power of checkpoint seems to be very limited. Is there any plan to
> support checkpoint while class is changed, like the discussion you gave me
> pointed out?
>
>
>
> Akhil Das 于2015年9月17日周四 下午3:26写道:
>
>> Any kind of changes to the jvm classes will make it fail. By
>> checkpointing the data you mean using checkpoint with updateStateByKey?
>> Here's a similar discussion happened earlier which will clear your doubts i
>> guess
>> http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang  wrote:
>>
>>> And here is another question. If I load the DStream from database every
>>> time I start the job, will the data be loaded when the job is failed and
>>> auto restart? If so, both the checkpoint data and database data are loaded,
>>> won't this a problem?
>>>
>>>
>>>
>>> Bin Wang 于2015年9月16日周三 下午8:40写道:
>>>
 Will StreamingContex.getOrCreate do this work?What kind of code change
 will make it cannot load?

 Akhil Das 于2015年9月16日周三 20:20写道:

> You can't really recover from checkpoint if you alter the code. A
> better approach would be to use some sort of external storage (like a db 
> or
> zookeeper etc) to keep the state (the indexes etc) and then when you 
> deploy
> new code they can be easily recovered.
>
> Thanks
> Best Regards
>
> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang  wrote:
>
>> I'd like to know if there is a way to recovery dstream from
>> checkpoint.
>>
>> Because I stores state in DStream, I'd like the state to be recovered
>> when I restart the application and deploy new code.
>>
>
>
>>


a document for JDK version testing status

2015-09-17 Thread luohui20001
Hi there,  I remembered there was a document showing most versions of JDK 
using and testing status in global companies' spark clusters.However I couldn't 
find it in spark website and databricks.Is there anyone who still remember that 
document and don't mind to provide a link?  Thanks.


 

ThanksBest regards!
San.Luo


Error with twitter streaming

2015-09-17 Thread Deepak Subhramanian
I am getting error with twitter streaming with spark 1.4 version and
twitter4j 3.0.6. There is another thread which also pointed the error.
The error happened after the streaming ran for more than 12 hours.
Here is the error log. I will try to use 3.0.3 as per the link below
and try it..


http://mail-archives.us.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAMwrk0=MpFU8YbcQPWibPoiwL4oRubO3r=tjz9rzgc1hc-m...@mail.gmail.com%3E



15/09/17 08:00:02 ERROR BlockGenerator: Error in block pushing thread

java.io.NotSerializableException: twitter4j.internal.json.ScopesImpl

Serialization stack:

- object not serializable (class: twitter4j.internal.json.ScopesImpl,
value: twitter4j.internal.json.ScopesImpl@357a15bb)

- field (class: twitter4j.internal.json.StatusJSONImpl, name: scopes,
type: interface twitter4j.Scopes)

- object (class twitter4j.internal.json.StatusJSONImpl,
StatusJSONImpl{createdAt=Thu Sep 17 08:00:02 BST 2015,
id=644405431868125184, text='Visiting Southampton Boat Show? Start
your day off with a Full English Breakfast for just £4.95
#southamptonboatshow #casino  #breakfast', source='http://sproutsocial.com; rel="nofollow">Sprout Social',
isTruncated=false, inReplyToStatusId=-1, inReplyToUserId=-1,
isFavorited=false, isRetweeted=false, favoriteCount=0,
inReplyToScreenName='null', geoLocation=null, place=null,
retweetCount=0, isPossiblySensitive=false, isoLanguageCode='null',
lang='en', contributorsIDs=[], retweetedStatus=null,
userMentionEntities=[], urlEntities=[],
hashtagEntities=[HashtagEntityJSONImpl{text='southamptonboatshow'},
HashtagEntityJSONImpl{text='casino'},
HashtagEntityJSONImpl{text='breakfast'}], mediaEntities=[],
symbolEntities=[], currentUserRetweetId=-1,
user=UserJSONImpl{id=66352550, name='GrosvenorSouthampton',
screenName='GCSouthampton', location='Southampton UK',
description='Southampton's most exciting & welcoming venue! Boasting a
fantastic restaurant, electronic & live gaming, regular poker games,
late night bar & much more', isContributorsEnabled=false,
profileImageUrl='http://pbs.twimg.com/profile_images/589163086365335553/V-Dm2O5A_normal.jpg',
profileImageUrlHttps='https://pbs.twimg.com/profile_images/589163086365335553/V-Dm2O5A_normal.jpg',
url='http://www.grosvenorcasinos.com/local-casinos/southampton',
isProtected=false, followersCount=1242, status=null,
profileBackgroundColor='131516', profileTextColor='33',
profileLinkColor='42D3FF', profileSidebarFillColor='EFEFEF',
profileSidebarBorderColor='EE', profileUseBackgroundImage=true,
showAllInlineMedia=false, friendsCount=362, createdAt=Mon Aug 17
14:23:55 BST 2009, favouritesCount=105, utcOffset=-1, timeZone='null',
profileBackgroundImageUrl='http://pbs.twimg.com/profile_background_images/530851272/Twitter_Background_Southampton.jpg',
profileBackgroundImageUrlHttps='https://pbs.twimg.com/profile_background_images/530851272/Twitter_Background_Southampton.jpg',
profileBackgroundTiled=true, lang='en', statusesCount=6927,
isGeoEnabled=false, isVerified=false, translator=false,
listedCount=19, isFollowRequestSent=false}})

at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at 
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)

at 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1189)

at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1198)

at org.apache.spark.storage.MemoryStore.putArray(MemoryStore.scala:131)

at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:168)

at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:142)

at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:790)

at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)

at 
org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:77)

at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:141)

at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:112)

at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$2.onPushBlock(ReceiverSupervisorImpl.scala:97)

at 
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:198)

at 
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:171)

at 
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:89)

15/09/17 08:00:02 WARN ReceiverSupervisorImpl: Reported error Error in
block pushing thread - java.io.NotSerializableException:
twitter4j.internal.json.ScopesImpl

Serialization stack:

- object not serializable (class: twitter4j.internal.json.ScopesImpl,
value: 

Re: [Spark Streaming] Distribute custom receivers evenly across excecutors

2015-09-17 Thread patrizio.munzi
Hi, did you managed to get it working?
And do you how this works on spark 1.3?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Distribute-custom-receivers-evenly-across-excecutors-tp6671p24724.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread srungarapu vamsi
@Adrian,
I am doing collect for debugging purpose. But i have to use foreachRDD so
that i can operate on top of this rdd and eventually save to DB.

But my actual problem here is to properly convert Array[Byte] to my custom
object.

On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase  wrote:

> Why are you calling foreachRdd / collect in the first place?
>
> Instead of using a custom decoder, you should simply do – this is code
> executed on the workers and allows the computation to continue. ForeachRdd
> and collect are output operations and force the data to be collected on the
> driver (assuming you don’t want that…)
>
> val events = kafkaDStream.map { case(devId,byteArray)=> 
> KafkaGenericEvent.parseFrom(byteArray) }
>
> From: srungarapu vamsi
> Date: Thursday, September 17, 2015 at 4:03 PM
> To: user
> Subject: Spark Streaming kafka directStream value decoder issue
>
> I am using KafkaUtils.createDirectStream to read the data from kafka bus.
>
> On the producer end, i am generating in the following way:
>
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>
> // Send some messages
> println("Sending message")
> val kafkaGenericEvent = new 
> KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
> val message = new ProducerRecord[String, 
> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
> producer.send(message)
>   }
>
> I am connecting to kafka using the console consumer script and am able to
> see proper data. The KafkaGenericEvent used in the above code is  the class
> generated using ScalaBuff from a protobuff file.
>
> On the consumer end,
> If i read the value as a normal byte array and the convert it into
> KafkaGenericEvent in the following way, i get proper data:
>
>  val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaConf, Set(topics))
>
> kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
>   case(devId,byteArray)=>{
> println(KafkaGenericEvent.parseFrom(byteArray))
>   }
> })
>
> But if change the value to KafkaGenericEvent and use a custom decoder like
> this:
>
> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
> Decoder[KafkaGenericEvent]{
>  override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
>KafkaGenericEvent.parseFrom(bytes)
>  }
> }
>
> and in consumer:
>
> val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
>  kafkaConf, Set(topics))
> kafkaDStream foreachRDD(rdd=>rdd.collect().map{
>   case(devId,genericEvent)=>{
> println(genericEvent)
>   }
> })
>
> Now, i my value object KafkaGenericEvent   is not created based on the
> sent data instead it is creating an empty Object of KafkaGenericEvent with
> default values.
>
> Even if i read the value as array of bytes in the createDirectStream and
> than apply a transformation in the following way i am getting in correct
> values:
>
> val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaConf, Set(topics))
>
> kafkaDStream.map{
>   case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
> } foreachRDD(rdd=>rdd.collect().map{
>   case(devId,genericEvent)=>{
> println(genericEvent)
>   }
> })
>
> I get the default KafkaGenericEvent Object in the line println
> (genericEvent)
> Does this mean that I can transform the values only on the driver and not
> on the executors?
>
> I am completely confused here!
> I am using :
>  scala-2.10.4
>  spark-1.3.1
>  kafka_2.10-0.8.2.1
>
> -
> /Vamsi
>



-- 
/Vamsi


Re: NGINX + Spark Web UI

2015-09-17 Thread Ruslan Dautkhanov
Similar setup for Hue
http://gethue.com/using-nginx-to-speed-up-hue-3-8-0/

Might give you an idea.



-- 
Ruslan Dautkhanov

On Thu, Sep 17, 2015 at 9:50 AM, mjordan79  wrote:

> Hello!
> I'm trying to set up a reverse proxy (using nginx) for the Spark Web UI.
> I have 2 machines:
> 1) Machine A, with a public IP. This machine will be used to access Spark
> Web UI on the Machine B through its private IP address.
> 2) Machine B, where Spark is installed (standalone master cluster, 1 worker
> node and the history server) not accessible from the outside.
>
> Basically I want to access the Spark Web UI through my Machine A using the
> URL:
> http://machine_A_ip_address/spark
>
> Currently I have this setup:
> http {
> proxy_set_header X-Real-IP $remote_addr;
> proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
> proxy_set_header Host $http_host;
> proxy_set_header X-NginX-Proxy true;
> proxy_set_header X-Ssl on;
> }
>
> # Master cluster node
> upstream app_master {
> server machine_B_ip_address:8080;
> }
>
> # Slave worker node
> upstream app_worker {
> server machine_B_ip_address:8081;
> }
>
> # Job UI
> upstream app_ui {
> server machine_B_ip_address:4040;
> }
>
> # History server
> upstream app_history {
> server machine_B_ip_address:18080;
> }
>
> I'm really struggling in figuring out a correct location directive to make
> the whole thing work, not only for accessing all ports using the url /spark
> but also in making the links in the web app be transformed accordingly. Any
> idea?
>
>
> Any help really appreciated.
> Thank you in advance.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/NGINX-Spark-Web-UI-tp24726.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Checkpointing with Kinesis

2015-09-17 Thread Alan Dipert
Hello,
We are using Spark Streaming 1.4.1 in AWS EMR to process records from
Kinesis.  Our Spark program saves RDDs to S3, after which the records are
picked up by a Lambda function that loads them into Redshift.  That no data
is lost during processing is important to us.

We have set our Kinesis checkpoint interval to 15 minutes, which is also
our window size.

Unfortunately, checkpointing happens after receiving data from Kinesis, not
after we have successfully written to S3.  If batches back up in Spark, and
the cluster is terminated, whatever data was in-memory will be lost because
it was checkpointed but not actually saved to S3.

We are considering forking and modifying the kinesis-asl library with
changes that would allow us to perform the checkpoint manually and at the
right time.  We'd rather not do this.

Are we overlooking an easier way to deal with this problem?  Thank you in
advance for your insight!

Alan


Can we do dataframe.query like Pandas dataframe in spark?

2015-09-17 Thread Rex X
With Pandas dataframe
,
we can do query:

>>> from numpy.random import randn>>> from pandas import DataFrame>>> df = 
>>> DataFrame(randn(10, 2), columns=list('ab'))>>> df.query('a > b')


This SQL-select-like query is very convenient. Can we do similar thing with
the new dataframe of spark?


Best,
Rex


Re: How to calculate average from multiple values

2015-09-17 Thread diplomatic Guru
Hi Robin,

You are a star! Thank you for the explanation and example. I converted your
code into Java without any hassle. It is working as I expected. I carried
out the final calculation (5th/6th) using mapValues and it is working
nicely. But I was wondering is there a better way to do it other than using
mapValues?

Cheers,

Raj


On 16 September 2015 at 20:13, Robin East  wrote:

> One way is to use foldByKey which is similar to reduceByKey but you supply
> a ‘zero’ value for the start of the computation. The idea is to add an
> extra element to the returned string to represent the count of the 5th
> element. You can then use the 5th and 6th elements to calculate the mean.
> The ‘zero’ value you supply to foldByKey is the all-zeros string
> “0,0,0,0,0,0”.
>
> Below is some example scala code that implements this idea - I’m sure
> Spark Java experts on the forum could turn this into the equivalent Java.
>
> initial.foldByKey("0,0,0,0,0,0")( (a,b) => {
> val iFirst = a.split(",")(0).toInt
> val iFirstB = b.split(",")(0).toInt
> val iFirth = a.split(",")(4).toInt
> val iFirthB = b.split(",")(4).toInt
> val countA  = if(a.split(",").size > 5) a.split(",")(5).toInt else 1
> val countB  = if(b.split(",").size > 5) b.split(",")(5).toInt else 1
> s"${iFirst + iFirstB},0,0,0,${iFirth + iFirthB},${countA + countB}"
>   }).collect
>
>
> This returns a collection of keys and 6 element strings where the 5th
> element is the sum of all the fifth entries and the 6th element is the
> running count of entries.
>
> ---
> Robin East
> *Spark GraphX in Action* Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
> On 16 Sep 2015, at 15:46, diplomatic Guru 
> wrote:
>
>  have a mapper that emit key/value pairs(composite keys and composite
> values separated by comma).
>
> e.g
>
> *key:* a,b,c,d *Value:* 1,2,3,4,5
>
> *key:* a1,b1,c1,d1 *Value:* 5,4,3,2,1
>
> ...
>
> ...
>
> *key:* a,b,c,d *Value:* 5,4,3,2,1
>
>
> I could easily SUM these values using reduceByKey.
>
> e.g.
>
> reduceByKey(new Function2() {
>
> @Override
> public String call(String value1, String value2) {
> String oldValue[] = value1.toString().split(",");
> String newValue[] = value2.toString().split(",");
>
> int iFirst = Integer.parseInt(oldValue[0]) + 
> Integer.parseInt(newValue[0]);
> int iSecond = Integer.parseInt(oldValue[1]) + 
> Integer.parseInt(newValue[1]);
> int iThird = Integer.parseInt(oldValue[2]) + 
> Integer.parseInt(newValue[2]);
> int iFourth = Integer.parseInt(oldValue[3]) + 
> Integer.parseInt(newValue[3]);
> int iFifth = Integer.parseInt(oldValue[4]) + 
> Integer.parseInt(newValue[4]);
>
> return iFirst  + "," + iSecond + ","
> + iThird+ "," + iFourth+ "," + iFifth;
>
> }
> });
>
> But the problem is how do I find average of just one of these values. Lets
> assume I want to SUM iFirst, iSecond, iThird and iFourth but I want to find
> Average of iFifth. How do i do it? With a simple key/value pairs I could
> use mapValues function but not sure how I could do it with my example.
> Please advice.
>
>
>


Re: Checkpointing with Kinesis

2015-09-17 Thread Aniket Bhatnagar
You can perhaps setup a WAL that logs to S3? New cluster should pick the
records that weren't processed due previous cluster termination.

Thanks,
Aniket

On Thu, Sep 17, 2015, 9:19 PM Alan Dipert  wrote:

> Hello,
> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
> picked up by a Lambda function that loads them into Redshift.  That no data
> is lost during processing is important to us.
>
> We have set our Kinesis checkpoint interval to 15 minutes, which is also
> our window size.
>
> Unfortunately, checkpointing happens after receiving data from Kinesis,
> not after we have successfully written to S3.  If batches back up in Spark,
> and the cluster is terminated, whatever data was in-memory will be lost
> because it was checkpointed but not actually saved to S3.
>
> We are considering forking and modifying the kinesis-asl library with
> changes that would allow us to perform the checkpoint manually and at the
> right time.  We'd rather not do this.
>
> Are we overlooking an easier way to deal with this problem?  Thank you in
> advance for your insight!
>
> Alan
>


Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread Adrian Tanase
I guess what I'm asking is why not start with a Byte array like in the example 
that works (using the DefaultDecoder) then map over it and do the decoding 
manually like I'm suggesting below.

Have you tried this approach? We have the same workflow (kafka => protobuf => 
custom class) and it works.
If you expect invalid messages, you can use flatMap instead and wrap 
.parse>From in a Try {} .toOption.

Sent from my iPhone

On 17 Sep 2015, at 18:23, srungarapu vamsi 
> wrote:

@Adrian,
I am doing collect for debugging purpose. But i have to use foreachRDD so that 
i can operate on top of this rdd and eventually save to DB.

But my actual problem here is to properly convert Array[Byte] to my custom 
object.


On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase 
> wrote:
Why are you calling foreachRdd / collect in the first place?

Instead of using a custom decoder, you should simply do - this is code executed 
on the workers and allows the computation to continue. ForeachRdd and collect 
are output operations and force the data to be collected on the driver 
(assuming you don't want that...)

val events = kafkaDStream.map { case(devId,byteArray)=> 
KafkaGenericEvent.parseFrom(byteArray) }

From: srungarapu vamsi
Date: Thursday, September 17, 2015 at 4:03 PM
To: user
Subject: Spark Streaming kafka directStream value decoder issue

I am using KafkaUtils.createDirectStream to read the data from kafka bus.

On the producer end, i am generating in the following way:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, KafkaGenericEvent](props)

// Send some messages
println("Sending message")
val kafkaGenericEvent = new 
KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
val message = new ProducerRecord[String, 
KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
producer.send(message)
  }

I am connecting to kafka using the console consumer script and am able to see 
proper data. The KafkaGenericEvent used in the above code is  the class 
generated using ScalaBuff from a protobuff file.

On the consumer end,
If i read the value as a normal byte array and the convert it into 
KafkaGenericEvent in the following way, i get proper data:

 val kafkaDStream  = 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
 kafkaConf, Set(topics))

kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
  case(devId,byteArray)=>{
println(KafkaGenericEvent.parseFrom(byteArray))
  }
})

But if change the value to KafkaGenericEvent and use a custom decoder like this:

class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
Decoder[KafkaGenericEvent]{
 override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
   KafkaGenericEvent.parseFrom(bytes)
 }
}

and in consumer:

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
 kafkaConf, Set(topics))
kafkaDStream foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

Now, i my value object KafkaGenericEvent   is not created based on the sent 
data instead it is creating an empty Object of KafkaGenericEvent with default 
values.

Even if i read the value as array of bytes in the createDirectStream and than 
apply a transformation in the following way i am getting in correct values:

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
 kafkaConf, Set(topics))

kafkaDStream.map{
  case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
} foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

I get the default KafkaGenericEvent Object in the line println(genericEvent)
Does this mean that I can transform the values only on the driver and not on 
the executors?

I am completely confused here!
I am using :
 scala-2.10.4
 spark-1.3.1
 kafka_2.10-0.8.2.1

-
/Vamsi




--
/Vamsi


NGINX + Spark Web UI

2015-09-17 Thread mjordan79
Hello!
I'm trying to set up a reverse proxy (using nginx) for the Spark Web UI.
I have 2 machines:
1) Machine A, with a public IP. This machine will be used to access Spark
Web UI on the Machine B through its private IP address.
2) Machine B, where Spark is installed (standalone master cluster, 1 worker
node and the history server) not accessible from the outside.

Basically I want to access the Spark Web UI through my Machine A using the
URL:
http://machine_A_ip_address/spark

Currently I have this setup:
http {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_set_header X-NginX-Proxy true;
proxy_set_header X-Ssl on;
}

# Master cluster node
upstream app_master {
server machine_B_ip_address:8080;
}

# Slave worker node
upstream app_worker {
server machine_B_ip_address:8081;
}

# Job UI
upstream app_ui {
server machine_B_ip_address:4040;
}

# History server
upstream app_history {
server machine_B_ip_address:18080;
}

I'm really struggling in figuring out a correct location directive to make
the whole thing work, not only for accessing all ports using the url /spark
but also in making the links in the web app be transformed accordingly. Any
idea?


Any help really appreciated.
Thank you in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NGINX-Spark-Web-UI-tp24726.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming application code change and stateful transformations

2015-09-17 Thread Cody Koeninger
The reason I'm dismissing the graceful shutdown approach is that if your
app crashes, and can't be restarted without code changes (e.g. a bug needs
to be fixed), you're screwed.

On Thu, Sep 17, 2015 at 3:56 AM, Adrian Tanase  wrote:

> This section in the streaming guide also outlines a new option – use 2
> versions in parallel for a period of time, controlling the draining /
> transition in the application level.
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code
>
> Also – I would not dismiss the graceful shutdown approach, since you’re
> controlling the shutdown.
> At a minimum, you can monitor if it was successful and if it failed, you
> simply restart the app, relying on checkpoint recovery before trying again…
>
> I’m copy-pasting more details from an answer I posted earlier to a similar
> question:
>
>1. Use 2 versions in parallel, drain the queue up to a point and strat
>fresh in the new version, only processing events from that point forward
>   1. Note that “up to a point” is specific to you state management
>   logic, it might mean “user sessions stated after 4 am” NOT “events 
> received
>   after 4 am”
>2. Graceful shutdown and saving data to DB, followed by checkpoint
>cleanup / new checkpoint dir
>   1. On restat, you need to use the updateStateByKey that takes an
>   initialRdd with the values preloaded from DB
>   2. By cleaning the checkpoint in between upgrades, data is loaded
>   only once
>
> Hope this helps,
> -adrian
>
> From: Ofir Kerker
> Date: Wednesday, September 16, 2015 at 6:12 PM
> To: Cody Koeninger
> Cc: "user@spark.apache.org"
> Subject: Re: Spark Streaming application code change and stateful
> transformations
>
> Thanks Cody!
> The 2nd solution is safer but seems wasteful :/
> I'll try to optimize it by keeping in addition to the 'last-complete-hour'
> the corresponding offsets that bound the incomplete data to try and
> fast-forward only the last couple of hours in the worst case.
>
> On Mon, Sep 14, 2015 at 22:14 Cody Koeninger  wrote:
>
>> Solution 2 sounds better to me.  You aren't always going to have graceful
>> shutdowns.
>>
>> On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker 
>> wrote:
>>
>>> Hi,
>>> My Spark Streaming application consumes messages (events) from Kafka
>>> every
>>> 10 seconds using the direct stream approach and aggregates these messages
>>> into hourly aggregations (to answer analytics questions like: "How many
>>> users from Paris visited page X between 8PM to 9PM") and save the data to
>>> Cassandra.
>>>
>>> I was wondering if there's a good practice for handling a code change in
>>> a
>>> Spark Streaming applications that uses stateful transformations
>>> (updateStateByKey for example) because the new application code will not
>>> be
>>> able to use the data that was checkpointed by the former application.
>>> I have thought of a few solutions for this issue and was hoping some of
>>> you
>>> have some experience with such case and can suggest other solutions or
>>> feedback my suggested solutions:
>>> *Solution #1*: On a graceful shutdown, in addition to the current Kafka
>>> offsets, persist the current aggregated data into Cassandra tables
>>> (different than the regular aggregation tables) that would allow reading
>>> them easily when the new application starts in order to build the initial
>>> state.
>>> *Solution #2*: When an hour is "complete" (i.e not expecting more events
>>> with the timestamp of this hour), update somewhere persistent (DB /
>>> shared
>>> file) the last-complete-hour. This will allow me, when the new
>>> application
>>> starts, to read all the events from Kafka from the beginning of retention
>>> period (last X hours) and ignore events from timestamp smaller or equal
>>> than
>>> the last-complete-hour.
>>>
>>> I'll be happy to get your feedback!
>>>
>>> Thanks,
>>> Ofir
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread Adrian Tanase
Why are you calling foreachRdd / collect in the first place?

Instead of using a custom decoder, you should simply do – this is code executed 
on the workers and allows the computation to continue. ForeachRdd and collect 
are output operations and force the data to be collected on the driver 
(assuming you don’t want that…)

val events = kafkaDStream.map { case(devId,byteArray)=> 
KafkaGenericEvent.parseFrom(byteArray) }

From: srungarapu vamsi
Date: Thursday, September 17, 2015 at 4:03 PM
To: user
Subject: Spark Streaming kafka directStream value decoder issue

I am using KafkaUtils.createDirectStream to read the data from kafka bus.

On the producer end, i am generating in the following way:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, KafkaGenericEvent](props)

// Send some messages
println("Sending message")
val kafkaGenericEvent = new 
KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
val message = new ProducerRecord[String, 
KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
producer.send(message)
  }

I am connecting to kafka using the console consumer script and am able to see 
proper data. The KafkaGenericEvent used in the above code is  the class 
generated using ScalaBuff from a protobuff file.

On the consumer end,
If i read the value as a normal byte array and the convert it into 
KafkaGenericEvent in the following way, i get proper data:

 val kafkaDStream  = 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
 kafkaConf, Set(topics))

kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
  case(devId,byteArray)=>{
println(KafkaGenericEvent.parseFrom(byteArray))
  }
})

But if change the value to KafkaGenericEvent and use a custom decoder like this:

class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
Decoder[KafkaGenericEvent]{
 override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
   KafkaGenericEvent.parseFrom(bytes)
 }
}

and in consumer:

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
 kafkaConf, Set(topics))
kafkaDStream foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

Now, i my value object KafkaGenericEvent   is not created based on the sent 
data instead it is creating an empty Object of KafkaGenericEvent with default 
values.

Even if i read the value as array of bytes in the createDirectStream and than 
apply a transformation in the following way i am getting in correct values:

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
 kafkaConf, Set(topics))

kafkaDStream.map{
  case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
} foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

I get the default KafkaGenericEvent Object in the line println(genericEvent)
Does this mean that I can transform the values only on the driver and not on 
the executors?

I am completely confused here!
I am using :
 scala-2.10.4
 spark-1.3.1
 kafka_2.10-0.8.2.1

-
/Vamsi



Re: Spark wastes a lot of space (tmp data) for iterative jobs

2015-09-17 Thread Ali Hadian
Thansk, but as I know, checkpointing is specific to streaming RDDs and is 
not implemented in regular RDDs (just inherited from the superclass, but not 
implemented).

How can I checkpoint the intermediate JavaRDDs??

-Original Message-
From: Alexis Gillain 
To: Ali Hadian 
Cc: spark users 
Date: Thu, 17 Sep 2015 02:03:46 +0800
Subject: Re: Spark wastes a lot of space (tmp data) for iterative jobs

Ok just realized you don't use mllib pagerank.

You must use checkpointing as pointed in the databricks url.

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicGraphCheckpointer.scala

Due to lineage Spark doesn't erase the shuffle file.
When you do :
contrib = link.join(rank)
rank = contrib.map(...)
contrib=link.join(rank)
I think Spark doesn't erase the shuffle files of the first join because they 
are still part of the lineage of the second contrib through rank.

Have a look at this : https://www.youtube.com/watch?v=1MWxIUoIYFA

2015-09-16 22:16 GMT+08:00 Ali Hadian :
Thanks for your response, Alexis.

I have seen this page, but its suggested solutions do not work and the tmp 
space still grows linearly after unpersisting RDDs and calling System.gc() 
in each iteration.

I think it might be due to one of the following reasons:

1. System.gc() does not directly invoke the garbage collector, but it just 
requests JVM to run GC, and JVM usually postpones it until memory is almost 
filled. However, since we are just running out of hard-disk space (not 
memory space), GC does not run; therefore the finalize() methods for the 
intermediate RDDs are not triggered.


2. System.gc() is only executed on the driver, but not on the workers (Is it 
how it works??!!)

Any suggestions?

Kind regards
Ali Hadian

-Original Message-
From: Alexis Gillain 
To: Ali Hadian 
Cc: spark users 
Date: Wed, 16 Sep 2015 12:05:35 +0800
Subject: Re: Spark wastes a lot of space (tmp data) for iterative jobs

You can try system.gc() considering that checkpointing is enabled by default 
in graphx :

https://forums.databricks.com/questions/277/how-do-i-avoid-the-no-space-left-on-device-error.html

2015-09-15 22:42 GMT+08:00 Ali Hadian < had...@comp.iust.ac.ir>:
Hi!
We are executing the PageRank example from the Spark java examples package 
on a very large input graph. The code is available here. (Spark's github 
repo).
During the execution, the framework generates huge amount of intermediate 
data per each iteration (i.e. the contribs RDD). The intermediate data is 
temporary, but Spark does not clear the intermediate data of previous 
iterations. That is to say, if we are in the middle of 20th iteration, all 
of the temporary data of all previous iterations (iteration 0 to 19) are 
still kept in the tmp  directory. As a result, the tmp directory grows 
linearly.
It seems rational to keep the data from only the previous iteration, because 
if the current iteration fails, the job can be continued using the 
intermediate data from the previous iteration. Anyways, why does it keep the 
intermediate data for ALL previous iterations???
How can we enforce Spark to clear these intermediate data  during the 
execution of job?

Kind regards, 
Ali hadian




--
Alexis GILLAIN



--
Alexis GILLAIN

Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread srungarapu vamsi
I am using KafkaUtils.createDirectStream to read the data from kafka bus.

On the producer end, i am generating in the following way:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, KafkaGenericEvent](props)

// Send some messages
println("Sending message")
val kafkaGenericEvent = new
KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
val message = new ProducerRecord[String,
KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
producer.send(message)
  }

I am connecting to kafka using the console consumer script and am able to
see proper data. The KafkaGenericEvent used in the above code is  the class
generated using ScalaBuff from a protobuff file.

On the consumer end,
If i read the value as a normal byte array and the convert it into
KafkaGenericEvent in the following way, i get proper data:

 val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
  case(devId,byteArray)=>{
println(KafkaGenericEvent.parseFrom(byteArray))
  }
})

But if change the value to KafkaGenericEvent and use a custom decoder like
this:

class KafkaGenericEventsDecoder(props: VerifiableProperties = null)
extends Decoder[KafkaGenericEvent]{
 override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
   KafkaGenericEvent.parseFrom(bytes)
 }
}

and in consumer:

val kafkaDStream  =
KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
kafkaConf, Set(topics))
kafkaDStream foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

Now, i my value object KafkaGenericEvent   is not created based on the sent
data instead it is creating an empty Object of KafkaGenericEvent with
default values.

Even if i read the value as array of bytes in the createDirectStream and
than apply a transformation in the following way i am getting in correct
values:

val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

kafkaDStream.map{
  case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
} foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

I get the default KafkaGenericEvent Object in the line println(genericEvent)
Does this mean that I can transform the values only on the driver and not
on the executors?

I am completely confused here!
I am using :
 scala-2.10.4
 spark-1.3.1
 kafka_2.10-0.8.2.1

-
/Vamsi


Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread Saisai Shao
Is your "KafkaGenericEvent" serializable? Since you call rdd.collect() to
fetch the data to local driver, so this KafkaGenericEvent need to be
serialized and deserialized through Java or Kryo (depends on your
configuration) serializer, not sure if it is your problem to always get a
default object.

Also would you provide the implementation of `parseFrom`, so we could
better understand the details of how you do deserialization.

Thanks
Saisai

On Thu, Sep 17, 2015 at 9:49 AM, srungarapu vamsi 
wrote:

> If i understand correctly, i guess you are suggesting me to do this  :
>
> val kafkaDStream  = 
> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>  kafkaConf, Set(topics))
>
> kafkaDStream.map{
>   case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
> } foreachRDD(rdd=>rdd.collect().map{
>   case(devId,genericEvent)=>{
> println(genericEvent)
>   }
> })
>
> I read from Kafka as a Byte Array => applied a transformation on the
> byteArray to Custom Class => Printed the custom class for debugging purpose.
>
> But this is not helping me. i.e i am getting an empty object with default
> values when i printed "genericEvent"
>
> Please correct me if i did not get what you are suggesting me to try.
>
>
> On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase  wrote:
>
>> I guess what I'm asking is why not start with a Byte array like in the
>> example that works (using the DefaultDecoder) then map over it and do the
>> decoding manually like I'm suggesting below.
>>
>> Have you tried this approach? We have the same workflow (kafka =>
>> protobuf => custom class) and it works.
>> If you expect invalid messages, you can use flatMap instead and wrap
>> .parseFrom in a Try {} .toOption.
>>
>> Sent from my iPhone
>>
>> On 17 Sep 2015, at 18:23, srungarapu vamsi 
>> wrote:
>>
>> @Adrian,
>> I am doing collect for debugging purpose. But i have to use foreachRDD so
>> that i can operate on top of this rdd and eventually save to DB.
>>
>> But my actual problem here is to properly convert Array[Byte] to my
>> custom object.
>>
>> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase  wrote:
>>
>>> Why are you calling foreachRdd / collect in the first place?
>>>
>>> Instead of using a custom decoder, you should simply do – this is code
>>> executed on the workers and allows the computation to continue. ForeachRdd
>>> and collect are output operations and force the data to be collected on the
>>> driver (assuming you don’t want that…)
>>>
>>> val events = kafkaDStream.map { case(devId,byteArray)=> 
>>> KafkaGenericEvent.parseFrom(byteArray) }
>>>
>>> From: srungarapu vamsi
>>> Date: Thursday, September 17, 2015 at 4:03 PM
>>> To: user
>>> Subject: Spark Streaming kafka directStream value decoder issue
>>>
>>> I am using KafkaUtils.createDirectStream to read the data from kafka
>>> bus.
>>>
>>> On the producer end, i am generating in the following way:
>>>
>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>>   "org.apache.kafka.common.serialization.StringSerializer")
>>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>>   "org.apache.kafka.common.serialization.StringSerializer")
>>> val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>>>
>>> // Send some messages
>>> println("Sending message")
>>> val kafkaGenericEvent = new 
>>> KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
>>> val message = new ProducerRecord[String, 
>>> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
>>> producer.send(message)
>>>   }
>>>
>>> I am connecting to kafka using the console consumer script and am able
>>> to see proper data. The KafkaGenericEvent used in the above code is  the
>>> class generated using ScalaBuff from a protobuff file.
>>>
>>> On the consumer end,
>>> If i read the value as a normal byte array and the convert it into
>>> KafkaGenericEvent in the following way, i get proper data:
>>>
>>>  val kafkaDStream  = 
>>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>>  kafkaConf, Set(topics))
>>>
>>> kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
>>>   case(devId,byteArray)=>{
>>> println(KafkaGenericEvent.parseFrom(byteArray))
>>>   }
>>> })
>>>
>>> But if change the value to KafkaGenericEvent and use a custom decoder
>>> like this:
>>>
>>> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
>>> Decoder[KafkaGenericEvent]{
>>>  override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
>>>KafkaGenericEvent.parseFrom(bytes)
>>>  }
>>> }
>>>
>>> and in consumer:
>>>
>>> val kafkaDStream  = 
>>> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,

How to add sparkSQL into a standalone application

2015-09-17 Thread Cui Lin
Hello,

I got stuck in adding spark sql into my standalone application.
The build.sbt is defined as:

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"


I got the following error when building the package:

*[error] /data/workspace/test/src/main/scala/TestMain.scala:6: object
sql is not a member of package org.apache.spark
[error] import org.apache.spark.sql.SQLContext;
[error] ^
[error] /data/workspace/test/src/main/scala/TestMain.scala:19: object
sql is not a member of package org.apache.spark
[error] val sqlContext = new org.apache.spark.sql.SQLContext(sc)
[error]   ^
[error] two errors found
[error] (compile:compile) Compilation failed*


So sparksql is not part of spark core package? I have no issue when testing
my codes in spark-shell. Thanks for the help!



-- 
Best regards!

Lin,Cui


Spark data type guesser UDAF

2015-09-17 Thread Ruslan Dautkhanov
Wanted to take something like this
https://github.com/fitzscott/AirQuality/blob/master/HiveDataTypeGuesser.java
and create a Hive UDAF to create an aggregate function that returns a data
type guess.
Am I inventing a wheel?
Does Spark have something like this already built-in?
Would be very useful for new wide datasets to explore data. Would be
helpful for ML too,
e.g. to decide categorical vs numerical variables.


Ruslan


Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread srungarapu vamsi
@Saisai Shao, Thanks for the pointer. It turned out to be the serialization
issue. I was using scalabuff to generate my "KafkaGenericEvent" class. But
when i went through the generated class code, i figured out that it is not
serializable.
Now i am generating my classes using scalapb (
https://github.com/trueaccord/ScalaPB) and my problem is solved.

Thanks

On Thu, Sep 17, 2015 at 10:43 PM, Saisai Shao 
wrote:

> Is your "KafkaGenericEvent" serializable? Since you call rdd.collect() to
> fetch the data to local driver, so this KafkaGenericEvent need to be
> serialized and deserialized through Java or Kryo (depends on your
> configuration) serializer, not sure if it is your problem to always get a
> default object.
>
> Also would you provide the implementation of `parseFrom`, so we could
> better understand the details of how you do deserialization.
>
> Thanks
> Saisai
>
> On Thu, Sep 17, 2015 at 9:49 AM, srungarapu vamsi <
> srungarapu1...@gmail.com> wrote:
>
>> If i understand correctly, i guess you are suggesting me to do this  :
>>
>> val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>  kafkaConf, Set(topics))
>>
>> kafkaDStream.map{
>>   case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
>> } foreachRDD(rdd=>rdd.collect().map{
>>   case(devId,genericEvent)=>{
>> println(genericEvent)
>>   }
>> })
>>
>> I read from Kafka as a Byte Array => applied a transformation on the
>> byteArray to Custom Class => Printed the custom class for debugging purpose.
>>
>> But this is not helping me. i.e i am getting an empty object with default
>> values when i printed "genericEvent"
>>
>> Please correct me if i did not get what you are suggesting me to try.
>>
>>
>> On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase  wrote:
>>
>>> I guess what I'm asking is why not start with a Byte array like in the
>>> example that works (using the DefaultDecoder) then map over it and do the
>>> decoding manually like I'm suggesting below.
>>>
>>> Have you tried this approach? We have the same workflow (kafka =>
>>> protobuf => custom class) and it works.
>>> If you expect invalid messages, you can use flatMap instead and wrap
>>> .parseFrom in a Try {} .toOption.
>>>
>>> Sent from my iPhone
>>>
>>> On 17 Sep 2015, at 18:23, srungarapu vamsi 
>>> wrote:
>>>
>>> @Adrian,
>>> I am doing collect for debugging purpose. But i have to use foreachRDD
>>> so that i can operate on top of this rdd and eventually save to DB.
>>>
>>> But my actual problem here is to properly convert Array[Byte] to my
>>> custom object.
>>>
>>> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase 
>>> wrote:
>>>
 Why are you calling foreachRdd / collect in the first place?

 Instead of using a custom decoder, you should simply do – this is code
 executed on the workers and allows the computation to continue. ForeachRdd
 and collect are output operations and force the data to be collected on the
 driver (assuming you don’t want that…)

 val events = kafkaDStream.map { case(devId,byteArray)=> 
 KafkaGenericEvent.parseFrom(byteArray) }

 From: srungarapu vamsi
 Date: Thursday, September 17, 2015 at 4:03 PM
 To: user
 Subject: Spark Streaming kafka directStream value decoder issue

 I am using KafkaUtils.createDirectStream to read the data from kafka
 bus.

 On the producer end, i am generating in the following way:

 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
   "org.apache.kafka.common.serialization.StringSerializer")
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
   "org.apache.kafka.common.serialization.StringSerializer")
 val producer = new KafkaProducer[String, KafkaGenericEvent](props)

 // Send some messages
 println("Sending message")
 val kafkaGenericEvent = new 
 KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
 val message = new ProducerRecord[String, 
 KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
 producer.send(message)
   }

 I am connecting to kafka using the console consumer script and am able
 to see proper data. The KafkaGenericEvent used in the above code is  the
 class generated using ScalaBuff from a protobuff file.

 On the consumer end,
 If i read the value as a normal byte array and the convert it into
 KafkaGenericEvent in the following way, i get proper data:

  val kafkaDStream  = 
 KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
  kafkaConf, Set(topics))

 kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
   case(devId,byteArray)=>{
 

Re: Spark monitoring

2015-09-17 Thread Pratham Khanna
Thanks, that worked

On Mon, Sep 14, 2015 at 4:54 PM, Akhil Das 
wrote:

> You can write a script to hit the MasterURL:8080/json end point to
> retrieve the information. It gives you  a response like this:
>
>
> {
>   "url" : "spark://akhldz:7077",
>   "workers" : [ {
> "id" : "worker-20150914165233-0.0.0.0-40324",
> "host" : "0.0.0.0",
> "port" : 40324,
> "webuiaddress" : "http://0.0.0.0:8081;,
> "cores" : 4,
> "coresused" : 0,
> "coresfree" : 4,
> "memory" : 2893,
> "memoryused" : 0,
> "memoryfree" : 2893,
> "state" : "ALIVE",
> "lastheartbeat" : 1442229774880
>   } ],
>   "cores" : 4,
>   "coresused" : 0,
>   "memory" : 2893,
>   "memoryused" : 0,
>   "activeapps" : [ ],
>   "completedapps" : [ ],
>   "activedrivers" : [ ],
>   "status" : "ALIVE"
> }
>
>
> Thanks
> Best Regards
>
> On Fri, Sep 11, 2015 at 11:46 PM, prk77  wrote:
>
>> Is there a way to fetch the current spark cluster memory & cpu usage
>> programmatically ?
>> I know that the default spark master web ui has these details but I want
>> to
>> retrieve them through a program and store them for analysis.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-monitoring-tp24660.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


SPARK-SQL parameter tuning for performance

2015-09-17 Thread Sadhan Sood
Hi Spark users,

We are running Spark on Yarn and often query table partitions as big as
100~200 GB from hdfs. Hdfs is co-located on the same cluster on which Spark
and Yarn run. I've noticed a much higher I/O read rates when I increase the
number of  executors cores from 2 to 8( Most tasks run in RACK_LOCAL and
few in NODE_LOCAL) while keeping the #executors constant. The ram on my
executor is around 24G. But the problem is that any subsequent shuffle
stage starts failing if I do that. It runs fine if i leave the number of
executors to 2 but then the read is much slower. The errors which I get
from Yarn is

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 1
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)


It seem to indicate Yarn is killing executors for using too much memory but
I can't be sure. I tried increasing the spark sql shuffle partitions as
well but that didn't help much. Is there a way we can run more partition
read tasks per executor but keep the #shuffle tasks still constant.


Thanks


Re: How to add sparkSQL into a standalone application

2015-09-17 Thread Michael Armbrust
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.4.1"

Though, I would consider using spark-hive and HiveContext, as the
query parser is more powerful and you'll have access to window
functions and other features.


On Thu, Sep 17, 2015 at 10:59 AM, Cui Lin  wrote:

> Hello,
>
> I got stuck in adding spark sql into my standalone application.
> The build.sbt is defined as:
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
>
>
> I got the following error when building the package:
>
> *[error] /data/workspace/test/src/main/scala/TestMain.scala:6: object sql is 
> not a member of package org.apache.spark
> [error] import org.apache.spark.sql.SQLContext;
> [error] ^
> [error] /data/workspace/test/src/main/scala/TestMain.scala:19: object sql is 
> not a member of package org.apache.spark
> [error] val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> [error]   ^
> [error] two errors found
> [error] (compile:compile) Compilation failed*
>
>
> So sparksql is not part of spark core package? I have no issue when
> testing my codes in spark-shell. Thanks for the help!
>
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: How to add sparkSQL into a standalone application

2015-09-17 Thread Michael Armbrust
You don't need to set anything up, it'll create a local hive metastore by
default if you don't explicitly configure one.

On Thu, Sep 17, 2015 at 11:45 AM, Cui Lin  wrote:

> Hi, Michael,
>
> It works to me! Thanks a lot!
> If I use spark-hive or HiveContext, do I have to setup Hive on server? Can
> I run this on my local laptop?
>
> On Thu, Sep 17, 2015 at 11:02 AM, Michael Armbrust  > wrote:
>
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.4.1"
>>
>> Though, I would consider using spark-hive and HiveContext, as the query 
>> parser is more powerful and you'll have access to window functions and other 
>> features.
>>
>>
>> On Thu, Sep 17, 2015 at 10:59 AM, Cui Lin 
>> wrote:
>>
>>> Hello,
>>>
>>> I got stuck in adding spark sql into my standalone application.
>>> The build.sbt is defined as:
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
>>>
>>>
>>> I got the following error when building the package:
>>>
>>> *[error] /data/workspace/test/src/main/scala/TestMain.scala:6: object sql 
>>> is not a member of package org.apache.spark
>>> [error] import org.apache.spark.sql.SQLContext;
>>> [error] ^
>>> [error] /data/workspace/test/src/main/scala/TestMain.scala:19: object sql 
>>> is not a member of package org.apache.spark
>>> [error] val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>>> [error]   ^
>>> [error] two errors found
>>> [error] (compile:compile) Compilation failed*
>>>
>>>
>>> So sparksql is not part of spark core package? I have no issue when
>>> testing my codes in spark-shell. Thanks for the help!
>>>
>>>
>>>
>>> --
>>> Best regards!
>>>
>>> Lin,Cui
>>>
>>
>>
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: Can we do dataframe.query like Pandas dataframe in spark?

2015-09-17 Thread Michael Armbrust
from pyspark.sql.functions import *

​

df = sqlContext.range(10).select(rand().alias("a"), rand().alias("b"))

df.where("a > b").show()

(2) Spark Jobs
+--+---+ | a| b|
+--+---+
|0.6697439215581628|0.23420961030968923| |0.9248996796756386|
0.4146647917936366| +--+---+

On Thu, Sep 17, 2015 at 9:32 AM, Rex X  wrote:

> With Pandas dataframe
> ,
> we can do query:
>
> >>> from numpy.random import randn>>> from pandas import DataFrame>>> df = 
> >>> DataFrame(randn(10, 2), columns=list('ab'))>>> df.query('a > b')
>
>
> This SQL-select-like query is very convenient. Can we do similar thing
> with the new dataframe of spark?
>
>
> Best,
> Rex
>


in joins, does one side stream?

2015-09-17 Thread Koert Kuipers
in scalding we join with the smaller side on the left, since the smaller
side will get buffered while the bigger side streams through the join.

looking at CoGroupedRDD i do not get the impression such a distiction is
made. it seems both sided are put into a map that can spill to disk. is
this correct?

thanks


Re: How to add sparkSQL into a standalone application

2015-09-17 Thread Cui Lin
Hi, Michael,

It works to me! Thanks a lot!
If I use spark-hive or HiveContext, do I have to setup Hive on server? Can
I run this on my local laptop?

On Thu, Sep 17, 2015 at 11:02 AM, Michael Armbrust 
wrote:

> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.4.1"
>
> Though, I would consider using spark-hive and HiveContext, as the query 
> parser is more powerful and you'll have access to window functions and other 
> features.
>
>
> On Thu, Sep 17, 2015 at 10:59 AM, Cui Lin  wrote:
>
>> Hello,
>>
>> I got stuck in adding spark sql into my standalone application.
>> The build.sbt is defined as:
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.1"
>>
>>
>> I got the following error when building the package:
>>
>> *[error] /data/workspace/test/src/main/scala/TestMain.scala:6: object sql is 
>> not a member of package org.apache.spark
>> [error] import org.apache.spark.sql.SQLContext;
>> [error] ^
>> [error] /data/workspace/test/src/main/scala/TestMain.scala:19: object sql is 
>> not a member of package org.apache.spark
>> [error] val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> [error]   ^
>> [error] two errors found
>> [error] (compile:compile) Compilation failed*
>>
>>
>> So sparksql is not part of spark core package? I have no issue when
>> testing my codes in spark-shell. Thanks for the help!
>>
>>
>>
>> --
>> Best regards!
>>
>> Lin,Cui
>>
>
>


-- 
Best regards!

Lin,Cui


Has anyone used the Twitter API for location filtering?

2015-09-17 Thread Jo Sunad
I've been trying to filter for GeoLocation, Place or even Time Zone and I
keep getting null values. I think I got one Place in 20 minutes of the app
running (without any filters on tweets).

Is this normal? Do I have to try querying rather than filtering?

my code is following TD's example...

val stream = TwitterUtils

val hashtags = stream.map (status => status.getPlace().getName(),
status.getText())

getText, getFollowers, etc all work fine, I just don't get anything
location based (and getLang() for some reason throws a noMethodError).

Thanks for the help!


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-17 Thread Alan Braithwaite
One other piece of information:

We're using zookeeper for persistence and when we brought the dispatcher
back online, it crashed on the same exception after loading the config from
zookeeper.

Cheers,
- Alan

On Thu, Sep 17, 2015 at 12:29 PM, Alan Braithwaite 
wrote:

> Hey All,
>
> To bump this thread once again, I'm having some trouble using the
> dispatcher as well.
>
> I'm using Mesos Cluster Manager with Docker Executors.  I've deployed the
> dispatcher as Marathon job.  When I submit a job using spark submit, the
> dispatcher writes back that the submission was successful and then promptly
> dies in marathon.  Looking at the logs reveals it was hitting the following
> line:
>
> 398:  throw new SparkException("Executor Spark home
> `spark.mesos.executor.home` is not set!")
>
> Which is odd because it's set in multiple places (SPARK_HOME,
> spark.mesos.executor.home, spark.home, etc).  Reading the code, it
> appears that the driver desc pulls only from the request and disregards any
> other properties that may be configured.  Testing by passing --conf
> spark.mesos.executor.home=/usr/local/spark on the command line to
> spark-submit confirms this.  We're trying to isolate the number of places
> where we have to set properties within spark and were hoping that it will
> be possible to have this pull in the spark-defaults.conf from somewhere, or
> at least allow the user to inform the dispatcher through spark-submit that
> those properties will be available once the job starts.
>
> Finally, I don't think the dispatcher should crash in this event.  It
> seems not exceptional that a job is misconfigured when submitted.
>
> Please direct me on the right path if I'm headed in the wrong direction.
> Also let me know if I should open some tickets for these issues.
>
> Thanks,
> - Alan
>
> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen  wrote:
>
>> Yes you can create an issue, or actually contribute a patch to update it
>> :)
>>
>> Sorry the docs is a bit light, I'm going to make it more complete along
>> the way.
>>
>> Tim
>>
>>
>> On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) <
>> tomwa...@cisco.com> wrote:
>>
>>> Tim,
>>>
>>> Thank you for the explanation.  You are correct, my Mesos experience is
>>> very light, and I haven’t deployed anything via Marathon yet.  What you
>>> have stated here makes sense, I will look into doing this.
>>>
>>> Adding this info to the docs would be great.  Is the appropriate action
>>> to create an issue regarding improvement of the docs?  For those of us who
>>> are gaining the experience having such a pointer is very helpful.
>>>
>>> Tom
>>>
>>> From: Tim Chen 
>>> Date: Thursday, September 10, 2015 at 10:25 AM
>>> To: Tom Waterhouse 
>>> Cc: "user@spark.apache.org" 
>>> Subject: Re: Spark on Mesos with Jobs in Cluster Mode Documentation
>>>
>>> Hi Tom,
>>>
>>> Sorry the documentation isn't really rich, since it's probably assuming
>>> users understands how Mesos and framework works.
>>>
>>> First I need explain the rationale of why create the dispatcher. If
>>> you're not familiar with Mesos yet, each node in your datacenter is
>>> installed a Mesos slave where it's responsible for publishing resources and
>>> running/watching tasks, and Mesos master is responsible for taking the
>>> aggregated resources and scheduling them among frameworks.
>>>
>>> Frameworks are not managed by Mesos, as Mesos master/slave doesn't
>>> launch and maintain framework but assume they're launched and kept running
>>> on its own. All the existing frameworks in the ecosystem therefore all have
>>> their own ways to deploy, HA and persist state (e.g: Aurora, Marathon, etc).
>>>
>>> Therefore, to introduce cluster mode with Mesos, we must create a
>>> framework that is long running that can be running in your datacenter, and
>>> can handle launching spark drivers on demand and handle HA, etc. This is
>>> what the dispatcher is all about.
>>>
>>> So the idea is that you should launch the dispatcher not on the client,
>>> but on a machine in your datacenter. In Mesosphere's DCOS we launch all
>>> frameworks and long running services with Marathon, and you can use
>>> Marathon to launch the Spark dispatcher.
>>>
>>> Then all clients instead of specifying the Mesos master URL (e.g:
>>> mesos://mesos.master:2181), then just talks to the dispatcher only
>>> (mesos://spark-dispatcher.mesos:7077), and the dispatcher will then start
>>> and watch the driver for you.
>>>
>>> Tim
>>>
>>>
>>>
>>> On Thu, Sep 10, 2015 at 10:13 AM, Tom Waterhouse (tomwater) <
>>> tomwa...@cisco.com> wrote:
>>>
 After spending most of yesterday scouring the Internet for sources of
 documentation for submitting Spark jobs in cluster mode to a Spark cluster
 managed by Mesos I was able to do just that, but I am not convinced that
 how I have things setup is correct.

 I used the Mesos 

Re: WAL on S3

2015-09-17 Thread Ted Yu
I assume you don't use Kinesis.

Are you running Spark 1.5.0 ?
If you must use S3, is switching to Kinesis possible ?

Cheers

On Thu, Sep 17, 2015 at 1:09 PM, Michal Čizmazia  wrote:

> How to make Write Ahead Logs to work with S3? Any pointers welcome!
>
> It seems as a known issue:
> https://issues.apache.org/jira/browse/SPARK-9215
>
> I am getting this exception when reading write ahead log:
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 5.0 (TID 14, localhost):
> org.apache.spark.SparkException: Could not read data from write ahead log
> record
> FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
> at
> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at
> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
> ... 13 more
>
>
>


Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread Adrian Tanase
Good catch!

BTW, great choice with ScalaPB, we moved from scalabuff as well, in order to 
generate the classes at compile time from sbt.

Sent from my iPhone

On 17 Sep 2015, at 22:00, srungarapu vamsi 
> wrote:

@Saisai Shao, Thanks for the pointer. It turned out to be the serialization 
issue. I was using scalabuff to generate my "KafkaGenericEvent" class. But when 
i went through the generated class code, i figured out that it is not 
serializable.
Now i am generating my classes using scalapb 
(https://github.com/trueaccord/ScalaPB) and my problem is solved.

Thanks


On Thu, Sep 17, 2015 at 10:43 PM, Saisai Shao 
> wrote:
Is your "KafkaGenericEvent" serializable? Since you call rdd.collect() to fetch 
the data to local driver, so this KafkaGenericEvent need to be serialized and 
deserialized through Java or Kryo (depends on your configuration) serializer, 
not sure if it is your problem to always get a default object.

Also would you provide the implementation of `parseFrom`, so we could better 
understand the details of how you do deserialization.

Thanks
Saisai

On Thu, Sep 17, 2015 at 9:49 AM, srungarapu vamsi 
> wrote:
If i understand correctly, i guess you are suggesting me to do this  :

val kafkaDStream  = 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
 kafkaConf, Set(topics))

kafkaDStream.map{
  case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
} foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

I read from Kafka as a Byte Array => applied a transformation on the byteArray 
to Custom Class => Printed the custom class for debugging purpose.

But this is not helping me. i.e i am getting an empty object with default 
values when i printed "genericEvent"

Please correct me if i did not get what you are suggesting me to try.



On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase 
> wrote:
I guess what I'm asking is why not start with a Byte array like in the example 
that works (using the DefaultDecoder) then map over it and do the decoding 
manually like I'm suggesting below.

Have you tried this approach? We have the same workflow (kafka => protobuf => 
custom class) and it works.
If you expect invalid messages, you can use flatMap instead and wrap 
.parse>From in a Try {} .toOption.

Sent from my iPhone

On 17 Sep 2015, at 18:23, srungarapu vamsi 
> wrote:

@Adrian,
I am doing collect for debugging purpose. But i have to use foreachRDD so that 
i can operate on top of this rdd and eventually save to DB.

But my actual problem here is to properly convert Array[Byte] to my custom 
object.


On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase 
> wrote:
Why are you calling foreachRdd / collect in the first place?

Instead of using a custom decoder, you should simply do – this is code executed 
on the workers and allows the computation to continue. ForeachRdd and collect 
are output operations and force the data to be collected on the driver 
(assuming you don’t want that…)

val events = kafkaDStream.map { case(devId,byteArray)=> 
KafkaGenericEvent.parseFrom(byteArray) }

From: srungarapu vamsi
Date: Thursday, September 17, 2015 at 4:03 PM
To: user
Subject: Spark Streaming kafka directStream value decoder issue

I am using KafkaUtils.createDirectStream to read the data from kafka bus.

On the producer end, i am generating in the following way:

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, KafkaGenericEvent](props)

// Send some messages
println("Sending message")
val kafkaGenericEvent = new 
KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
val message = new ProducerRecord[String, 
KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
producer.send(message)
  }

I am connecting to kafka using the console consumer script and am able to see 
proper data. The KafkaGenericEvent used in the above code is  the class 
generated using ScalaBuff from a protobuff file.

On the consumer end,
If i read the value as a normal byte array and the convert it into 
KafkaGenericEvent in the following way, i get proper data:

 val kafkaDStream  = 
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
 kafkaConf, Set(topics))

kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
  

Stopping criteria for gradient descent

2015-09-17 Thread nishanthps
Hi,

I am running LogisticRegressionWithSGD in spark 1.4.1 and it always takes
100 iterations to train (which is the default). It never meets the
convergence criteria, shouldn't the convergence criteria for SGD be based on
difference in logloss or the difference in accuracy on a held out test set
instead of the difference in weight vectors?

Code for convergence criteria:
https://github.com/apache/spark/blob/c0e9ff1588b4d9313cc6ec6e00e5c7663eb67910/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L251

Thanks,
Nishanth



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-criteria-for-gradient-descent-tp24727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: WAL on S3

2015-09-17 Thread Tathagata Das
Actually, the current WAL implementation (as of Spark 1.5) does not work
with S3 because S3 does not support flushing. Basically, the current
implementation assumes that after write + flush, the data is immediately
durable, and readable if the system crashes without closing the WAL file.
This does not work with S3 as data is durable only and only if the S3 file
output stream is cleanly closed.




On Thu, Sep 17, 2015 at 1:30 PM, Ted Yu  wrote:

> I assume you don't use Kinesis.
>
> Are you running Spark 1.5.0 ?
> If you must use S3, is switching to Kinesis possible ?
>
> Cheers
>
> On Thu, Sep 17, 2015 at 1:09 PM, Michal Čizmazia 
> wrote:
>
>> How to make Write Ahead Logs to work with S3? Any pointers welcome!
>>
>> It seems as a known issue:
>> https://issues.apache.org/jira/browse/SPARK-9215
>>
>> I am getting this exception when reading write ahead log:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 5.0 (TID 14, localhost):
>> org.apache.spark.SparkException: Could not read data from write ahead log
>> record
>> FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>> at
>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>> ... 13 more
>>
>>
>>
>


KafkaDirectStream can't be recovered from checkpoint

2015-09-17 Thread Petr Novak
Hi all,
it throws FileBasedWriteAheadLogReader: Error reading next item, EOF reached
java.io.EOFException
  at java.io.DataInputStream.readInt(DataInputStream.java:392)
  at
org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)

WAL is not enabled in config, it is default, hence false.

The code is by example and quite simple for testing (I'm aware that file
save isn't idempotent). Or do I have something wrong there? It was tried on
Spark 1.5.0.

object Loader {
  def main(args: Array[String]): Unit = {

val checkpointDir = "/dfs/spark/checkpoints"

val sparkConf = new SparkConf()
  .setAppName("Spark Loader")
  .setIfMissing("spark.master", "local[2]")
  .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000")

val ssc = StreamingContext.getOrCreate(
  checkpointDir,
  createStreamingContext(sparkConf, checkpointDir))

ssc.start()
ssc.awaitTermination()
  }

  def createStreamingContext(conf: SparkConf, checkpointDir:
String)(): StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(60))

val sc = ssc.sparkContext
val sqlc = new SQLContext(sc)

ssc.checkpoint(checkpointDir)

import sqlc.implicits._

val kafkaParams = Map[String, String](
  "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092",
  "auto.offset.reset" -> "smallest")

val topics = Set("topic-p03-r01")

val stream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

stream
  .checkpoint(Seconds(60))
  .foreachRDD { (rdd, time) =>
  rdd.toDF()
.write
.json(s"/dfs/spark/agg/${time.milliseconds / 1000}")
}

ssc
  }
}


Many thanks for any idea,
Petr


Creating BlockMatrix with java API

2015-09-17 Thread Pulasthi Supun Wickramasinghe
Hi All,

I am new to Spark and i am trying to do some BlockMatrix operations with
the Mllib API's. But i can't seem to create a BlockMatrix with the java
API. I tried the following

Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
List,Matrix>> list = new
ArrayList, Matrix>>();
Tuple2 intTuple = new Tuple2(0,0);
Tuple2,Matrix> tuple2MatrixTuple2 = new
Tuple2, Matrix>(intTuple,matrixa );
list.add(tuple2MatrixTuple2);
JavaRDD, Matrix>> rdd = sc.parallelize(list);

BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);


but since BlockMatrix only
takes 
"RDD,Matrix>>"
this code does not work. sc.parallelize() returns a JavaRDD so the two are
not compatible. I also couldn't find any code samples for this. Any help on
this would be highly appreciated.

Best Regards,
Pulasthi
-- 
Pulasthi S. Wickramasinghe
Graduate Student  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


selected field not getting pushed down into my DataSource?

2015-09-17 Thread Timothy Potter
I'm using Spark 1.4.1 and am doing the following with spark-shell:

solr = sqlContext.read.format("solr").option("zkhost",
"localhost:2181").option("collection","spark").load()

solr.select("id").count()

The Solr DataSource implements PrunedFilteredScan so I expected the
buildScan method to get called with the "id" field in the fields list.
However, "id" is not getting passed to my DataSource.

Shouldn't selected fields get passed to buildScan?

Thanks.
Tim

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-17 Thread Cody Koeninger
Is there a particular reason you're calling checkpoint on the stream in
addition to the streaming context?

On Thu, Sep 17, 2015 at 2:36 PM, Petr Novak  wrote:

> Hi all,
> it throws FileBasedWriteAheadLogReader: Error reading next item, EOF
> reached
> java.io.EOFException
>   at java.io.DataInputStream.readInt(DataInputStream.java:392)
>   at
> org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
>
> WAL is not enabled in config, it is default, hence false.
>
> The code is by example and quite simple for testing (I'm aware that file
> save isn't idempotent). Or do I have something wrong there? It was tried on
> Spark 1.5.0.
>
> object Loader {
>   def main(args: Array[String]): Unit = {
>
> val checkpointDir = "/dfs/spark/checkpoints"
>
> val sparkConf = new SparkConf()
>   .setAppName("Spark Loader")
>   .setIfMissing("spark.master", "local[2]")
>   .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000")
>
> val ssc = StreamingContext.getOrCreate(
>   checkpointDir,
>   createStreamingContext(sparkConf, checkpointDir))
>
> ssc.start()
> ssc.awaitTermination()
>   }
>
>   def createStreamingContext(conf: SparkConf, checkpointDir: String)(): 
> StreamingContext = {
> val ssc = new StreamingContext(conf, Seconds(60))
>
> val sc = ssc.sparkContext
> val sqlc = new SQLContext(sc)
>
> ssc.checkpoint(checkpointDir)
>
> import sqlc.implicits._
>
> val kafkaParams = Map[String, String](
>   "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092",
>   "auto.offset.reset" -> "smallest")
>
> val topics = Set("topic-p03-r01")
>
> val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
> ssc, kafkaParams, topics)
>
> stream
>   .checkpoint(Seconds(60))
>   .foreachRDD { (rdd, time) =>
>   rdd.toDF()
> .write
> .json(s"/dfs/spark/agg/${time.milliseconds / 1000}")
> }
>
> ssc
>   }
> }
>
>
> Many thanks for any idea,
> Petr
>


WAL on S3

2015-09-17 Thread Michal Čizmazia
How to make Write Ahead Logs to work with S3? Any pointers welcome!

It seems as a known issue: https://issues.apache.org/jira/browse/SPARK-9215

I am getting this exception when reading write ahead log:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 5.0 (TID 14, localhost):
org.apache.spark.SparkException: Could not read data from write ahead log
record
FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
at
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
... 13 more


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-17 Thread Alan Braithwaite
Hey All,

To bump this thread once again, I'm having some trouble using the
dispatcher as well.

I'm using Mesos Cluster Manager with Docker Executors.  I've deployed the
dispatcher as Marathon job.  When I submit a job using spark submit, the
dispatcher writes back that the submission was successful and then promptly
dies in marathon.  Looking at the logs reveals it was hitting the following
line:

398:  throw new SparkException("Executor Spark home
`spark.mesos.executor.home` is not set!")

Which is odd because it's set in multiple places (SPARK_HOME,
spark.mesos.executor.home, spark.home, etc).  Reading the code, it appears
that the driver desc pulls only from the request and disregards any other
properties that may be configured.  Testing by passing --conf
spark.mesos.executor.home=/usr/local/spark on the command line to
spark-submit confirms this.  We're trying to isolate the number of places
where we have to set properties within spark and were hoping that it will
be possible to have this pull in the spark-defaults.conf from somewhere, or
at least allow the user to inform the dispatcher through spark-submit that
those properties will be available once the job starts.

Finally, I don't think the dispatcher should crash in this event.  It seems
not exceptional that a job is misconfigured when submitted.

Please direct me on the right path if I'm headed in the wrong direction.
Also let me know if I should open some tickets for these issues.

Thanks,
- Alan

On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen  wrote:

> Yes you can create an issue, or actually contribute a patch to update it :)
>
> Sorry the docs is a bit light, I'm going to make it more complete along
> the way.
>
> Tim
>
>
> On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) <
> tomwa...@cisco.com> wrote:
>
>> Tim,
>>
>> Thank you for the explanation.  You are correct, my Mesos experience is
>> very light, and I haven’t deployed anything via Marathon yet.  What you
>> have stated here makes sense, I will look into doing this.
>>
>> Adding this info to the docs would be great.  Is the appropriate action
>> to create an issue regarding improvement of the docs?  For those of us who
>> are gaining the experience having such a pointer is very helpful.
>>
>> Tom
>>
>> From: Tim Chen 
>> Date: Thursday, September 10, 2015 at 10:25 AM
>> To: Tom Waterhouse 
>> Cc: "user@spark.apache.org" 
>> Subject: Re: Spark on Mesos with Jobs in Cluster Mode Documentation
>>
>> Hi Tom,
>>
>> Sorry the documentation isn't really rich, since it's probably assuming
>> users understands how Mesos and framework works.
>>
>> First I need explain the rationale of why create the dispatcher. If
>> you're not familiar with Mesos yet, each node in your datacenter is
>> installed a Mesos slave where it's responsible for publishing resources and
>> running/watching tasks, and Mesos master is responsible for taking the
>> aggregated resources and scheduling them among frameworks.
>>
>> Frameworks are not managed by Mesos, as Mesos master/slave doesn't launch
>> and maintain framework but assume they're launched and kept running on its
>> own. All the existing frameworks in the ecosystem therefore all have their
>> own ways to deploy, HA and persist state (e.g: Aurora, Marathon, etc).
>>
>> Therefore, to introduce cluster mode with Mesos, we must create a
>> framework that is long running that can be running in your datacenter, and
>> can handle launching spark drivers on demand and handle HA, etc. This is
>> what the dispatcher is all about.
>>
>> So the idea is that you should launch the dispatcher not on the client,
>> but on a machine in your datacenter. In Mesosphere's DCOS we launch all
>> frameworks and long running services with Marathon, and you can use
>> Marathon to launch the Spark dispatcher.
>>
>> Then all clients instead of specifying the Mesos master URL (e.g:
>> mesos://mesos.master:2181), then just talks to the dispatcher only
>> (mesos://spark-dispatcher.mesos:7077), and the dispatcher will then start
>> and watch the driver for you.
>>
>> Tim
>>
>>
>>
>> On Thu, Sep 10, 2015 at 10:13 AM, Tom Waterhouse (tomwater) <
>> tomwa...@cisco.com> wrote:
>>
>>> After spending most of yesterday scouring the Internet for sources of
>>> documentation for submitting Spark jobs in cluster mode to a Spark cluster
>>> managed by Mesos I was able to do just that, but I am not convinced that
>>> how I have things setup is correct.
>>>
>>> I used the Mesos published
>>> 
>>> instructions for setting up my Mesos cluster.  I have three Zookeeper
>>> instances, three Mesos master instances, and three Mesos slave instances.
>>> This is all running in Openstack.
>>>
>>> The documentation on the Spark documentation site states that “To use
>>> cluster mode, you must start the MesosClusterDispatcher in your cluster via
>>> 

Create view on nested JSON doesn't recognize column names

2015-09-17 Thread Dan LaBar
I’m trying to create a view on a nested JSON file (converted to a dict)
using PySpark 1.4.1. The SQL looks like this:

create view myView asselect myColA, myStruct.ColB,
myStruct.nestedColCfrom myTblwhere myColD = "some value";

The select statement by itself runs fine, but when I try to create the view
I get the following error:

SQL Error [500051] [HY000]: [Simba][HiveJDBCDriver](500051) ERROR
processing query/statement. Error Code: 0, SQL state:
TStatus(statusCode:ERROR_STATUS, errorCode:0,
errorMessage:org.apache.spark.sql.execution.QueryExecutionException:
FAILED: SemanticException [Error 10004]: Line 2:7 Invalid table alias
or column reference 'myColA': (possible column names are: col)), ...

I get a similar error using the Hive Context from PySpark:

An error occurred while calling o111.sql.
: org.apache.spark.sql.execution.QueryExecutionException: FAILED:
SemanticException [Error 10004]: Line 1:133 Invalid table alias or
column reference 'myColD': (possible column names are: col)
at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:349)
at 
org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:326)
at 
org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:155)
at 
org.apache.spark.sql.hive.client.ClientWrapper.runHive(ClientWrapper.scala:326)
at 
org.apache.spark.sql.hive.client.ClientWrapper.runSqlHive(ClientWrapper.scala:316)
at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:473)
at 
org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:144)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:128)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:755)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

What am I doing wrong, or is this perhaps a bug?

Thanks,
Dan
​


Re: WAL on S3

2015-09-17 Thread Michal Čizmazia
Please could you explain how to use pluggable WAL?

After I implement the WriteAheadLog abstract class, how can I use it? I
want to use it with a Custom Reliable Receiver. I am using Spark 1.4.1.

Thanks!


On 17 September 2015 at 16:40, Tathagata Das  wrote:

> Actually, the current WAL implementation (as of Spark 1.5) does not work
> with S3 because S3 does not support flushing. Basically, the current
> implementation assumes that after write + flush, the data is immediately
> durable, and readable if the system crashes without closing the WAL file.
> This does not work with S3 as data is durable only and only if the S3 file
> output stream is cleanly closed.
>
>
>
>
> On Thu, Sep 17, 2015 at 1:30 PM, Ted Yu  wrote:
>
>> I assume you don't use Kinesis.
>>
>> Are you running Spark 1.5.0 ?
>> If you must use S3, is switching to Kinesis possible ?
>>
>> Cheers
>>
>> On Thu, Sep 17, 2015 at 1:09 PM, Michal Čizmazia 
>> wrote:
>>
>>> How to make Write Ahead Logs to work with S3? Any pointers welcome!
>>>
>>> It seems as a known issue:
>>> https://issues.apache.org/jira/browse/SPARK-9215
>>>
>>> I am getting this exception when reading write ahead log:
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent
>>> failure: Lost task 0.0 in stage 5.0 (TID 14, localhost):
>>> org.apache.spark.SparkException: Could not read data from write ahead log
>>> record
>>> FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>> ... 13 more
>>>
>>>
>>>
>>
>


Re: WAL on S3

2015-09-17 Thread Tathagata Das
You could override the spark conf called
"spark.streaming.receiver.writeAheadLog.class" with the class name.
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala#L30

On Thu, Sep 17, 2015 at 2:04 PM, Michal Čizmazia  wrote:

> Please could you explain how to use pluggable WAL?
>
> After I implement the WriteAheadLog abstract class, how can I use it? I
> want to use it with a Custom Reliable Receiver. I am using Spark 1.4.1.
>
> Thanks!
>
>
> On 17 September 2015 at 16:40, Tathagata Das  wrote:
>
>> Actually, the current WAL implementation (as of Spark 1.5) does not work
>> with S3 because S3 does not support flushing. Basically, the current
>> implementation assumes that after write + flush, the data is immediately
>> durable, and readable if the system crashes without closing the WAL file.
>> This does not work with S3 as data is durable only and only if the S3 file
>> output stream is cleanly closed.
>>
>>
>>
>>
>> On Thu, Sep 17, 2015 at 1:30 PM, Ted Yu  wrote:
>>
>>> I assume you don't use Kinesis.
>>>
>>> Are you running Spark 1.5.0 ?
>>> If you must use S3, is switching to Kinesis possible ?
>>>
>>> Cheers
>>>
>>> On Thu, Sep 17, 2015 at 1:09 PM, Michal Čizmazia 
>>> wrote:
>>>
 How to make Write Ahead Logs to work with S3? Any pointers welcome!

 It seems as a known issue:
 https://issues.apache.org/jira/browse/SPARK-9215

 I am getting this exception when reading write ahead log:

 Exception in thread "main" org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent
 failure: Lost task 0.0 in stage 5.0 (TID 14, localhost):
 org.apache.spark.SparkException: Could not read data from write ahead log
 record
 FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.NullPointerException
 at
 org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
 at
 org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
 $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
 ... 13 more



>>>
>>
>


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-17 Thread Alan Braithwaite
Small update:  I found properties-file spark-submit parameter by reading
the code and that seems to work, but appears to be undocumented in the
submitting applications doc page.

- Alan

On Thu, Sep 17, 2015 at 12:39 PM, Alan Braithwaite 
wrote:

> One other piece of information:
>
> We're using zookeeper for persistence and when we brought the dispatcher
> back online, it crashed on the same exception after loading the config from
> zookeeper.
>
> Cheers,
> - Alan
>
> On Thu, Sep 17, 2015 at 12:29 PM, Alan Braithwaite 
> wrote:
>
>> Hey All,
>>
>> To bump this thread once again, I'm having some trouble using the
>> dispatcher as well.
>>
>> I'm using Mesos Cluster Manager with Docker Executors.  I've deployed the
>> dispatcher as Marathon job.  When I submit a job using spark submit, the
>> dispatcher writes back that the submission was successful and then promptly
>> dies in marathon.  Looking at the logs reveals it was hitting the following
>> line:
>>
>> 398:  throw new SparkException("Executor Spark home
>> `spark.mesos.executor.home` is not set!")
>>
>> Which is odd because it's set in multiple places (SPARK_HOME,
>> spark.mesos.executor.home, spark.home, etc).  Reading the code, it
>> appears that the driver desc pulls only from the request and disregards any
>> other properties that may be configured.  Testing by passing --conf
>> spark.mesos.executor.home=/usr/local/spark on the command line to
>> spark-submit confirms this.  We're trying to isolate the number of places
>> where we have to set properties within spark and were hoping that it will
>> be possible to have this pull in the spark-defaults.conf from somewhere, or
>> at least allow the user to inform the dispatcher through spark-submit that
>> those properties will be available once the job starts.
>>
>> Finally, I don't think the dispatcher should crash in this event.  It
>> seems not exceptional that a job is misconfigured when submitted.
>>
>> Please direct me on the right path if I'm headed in the wrong direction.
>> Also let me know if I should open some tickets for these issues.
>>
>> Thanks,
>> - Alan
>>
>> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen  wrote:
>>
>>> Yes you can create an issue, or actually contribute a patch to update it
>>> :)
>>>
>>> Sorry the docs is a bit light, I'm going to make it more complete along
>>> the way.
>>>
>>> Tim
>>>
>>>
>>> On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) <
>>> tomwa...@cisco.com> wrote:
>>>
 Tim,

 Thank you for the explanation.  You are correct, my Mesos experience is
 very light, and I haven’t deployed anything via Marathon yet.  What you
 have stated here makes sense, I will look into doing this.

 Adding this info to the docs would be great.  Is the appropriate action
 to create an issue regarding improvement of the docs?  For those of us who
 are gaining the experience having such a pointer is very helpful.

 Tom

 From: Tim Chen 
 Date: Thursday, September 10, 2015 at 10:25 AM
 To: Tom Waterhouse 
 Cc: "user@spark.apache.org" 
 Subject: Re: Spark on Mesos with Jobs in Cluster Mode Documentation

 Hi Tom,

 Sorry the documentation isn't really rich, since it's probably assuming
 users understands how Mesos and framework works.

 First I need explain the rationale of why create the dispatcher. If
 you're not familiar with Mesos yet, each node in your datacenter is
 installed a Mesos slave where it's responsible for publishing resources and
 running/watching tasks, and Mesos master is responsible for taking the
 aggregated resources and scheduling them among frameworks.

 Frameworks are not managed by Mesos, as Mesos master/slave doesn't
 launch and maintain framework but assume they're launched and kept running
 on its own. All the existing frameworks in the ecosystem therefore all have
 their own ways to deploy, HA and persist state (e.g: Aurora, Marathon, 
 etc).

 Therefore, to introduce cluster mode with Mesos, we must create a
 framework that is long running that can be running in your datacenter, and
 can handle launching spark drivers on demand and handle HA, etc. This is
 what the dispatcher is all about.

 So the idea is that you should launch the dispatcher not on the client,
 but on a machine in your datacenter. In Mesosphere's DCOS we launch all
 frameworks and long running services with Marathon, and you can use
 Marathon to launch the Spark dispatcher.

 Then all clients instead of specifying the Mesos master URL (e.g:
 mesos://mesos.master:2181), then just talks to the dispatcher only
 (mesos://spark-dispatcher.mesos:7077), and the dispatcher will then start
 and watch the driver for you.

 Tim



 On 

Spark streaming to database exception handling

2015-09-17 Thread david w
I am using spark stream to receive data from kafka, and then write result rdd
to external database inside foreachPartition(). All thing works fine, my
question is how can we handle no data loss if there is database connection
failure, or other exception happened during write data to external storage.
Is there any way we can notify spark streaming to replay that RDD or some
ack mechanism?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-to-database-exception-handling-tp24728.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark w/YARN Scheduling Questions...

2015-09-17 Thread Robert Saccone
Hello


We're running some experiments with Spark (v1.4) and have some questions
about its scheduling behavior.  I am hoping someone can answer the
following questions.


What is a task set?  It is mentioned in the Spark logs we get from our runs
but we can't seem to find a definition and how it relates to the Spark
concepts of Jobs, Stages, and Tasks in the online documentation.  This
makes it hard to reason about the scheduling behavior.


What is the heuristic used to kill executors when running Spark with YARN
in dynamic mode?  From the logs what we observe is that executors that have
work (task sets) queued to them are being killed and the work (task sets)
are being reassigned to other executors.  This seems inconsistent with the
online documentation which says that executors aren't killed until they've
been idle for a user configurable number of seconds.


We're using the Fair scheduler pooling with multiple pools each with
different weights, so is it correct that there are queues in the pools and
in the executors as well?


We can provide more details on our setup if desired.


Regards,

Rob Saccone

IBM T. J. Watson Center


Re: Null Value in DecimalType column of DataFrame

2015-09-17 Thread Yin Huai
As I mentioned before, the range of values of DecimalType(10, 10) is [0,
1). If you have a value 10.5 and you want to cast it to DecimalType(10,
10), I do not think there is any better returned value except of null.
Looks like DecimalType(10, 10) is not the right type for your use case. You
need a decimal type that has precision - scale >= 2.

On Tue, Sep 15, 2015 at 6:39 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

>
> Hi Yin, posted here because I think it's a bug.
> So, it will return null and I can get a nullpointerexception, as I was
> getting. Is this really the expected behavior? Never seen something
> returning null in other Scala tools that I used.
>
> Regards,
>
>
> 2015-09-14 18:54 GMT-03:00 Yin Huai :
>
>> btw, move it to user list.
>>
>> On Mon, Sep 14, 2015 at 2:54 PM, Yin Huai  wrote:
>>
>>> A scale of 10 means that there are 10 digits at the right of the decimal
>>> point. If you also have precision 10, the range of your data will be [0, 1)
>>> and casting "10.5" to DecimalType(10, 10) will return null, which is
>>> expected.
>>>
>>> On Mon, Sep 14, 2015 at 1:42 PM, Dirceu Semighini Filho <
>>> dirceu.semigh...@gmail.com> wrote:
>>>
 Hi all,
 I'm moving from spark 1.4 to 1.5, and one of my tests is failing.
 It seems that there was some changes in org.apache.spark.sql.types.
 DecimalType

 This ugly code is a little sample to reproduce the error, don't use it
 into your project.

 test("spark test") {
   val file = 
 context.sparkContext().textFile(s"${defaultFilePath}Test.csv").map(f => 
 Row.fromSeq({
 val values = f.split(",")
 
 Seq(values.head.toString.toInt,values.tail.head.toString.toInt,BigDecimal(values.tail.tail.head),
 values.tail.tail.tail.head)}))

   val structType = StructType(Seq(StructField("id", IntegerType, false),
 StructField("int2", IntegerType, false), StructField("double",

  DecimalType(10,10), false),


 StructField("str2", StringType, false)))

   val df = context.sqlContext.createDataFrame(file,structType)
   df.first
 }

 The content of the file is:

 1,5,10.5,va
 2,1,0.1,vb
 3,8,10.0,vc

 The problem resides in DecimalType, before 1.5 the scala wasn't
 required. Now when using  DecimalType(12,10) it works fine, but using
 DecimalType(10,10) the Decimal values
 10.5 became null, and the 0.1 works.

 Is there anybody working with DecimalType for 1.5.1?

 Regards,
 Dirceu


>>>
>>
>
>


Re: Can we do dataframe.query like Pandas dataframe in spark?

2015-09-17 Thread Rex X
very cool! Thank you, Michael.


On Thu, Sep 17, 2015 at 11:00 AM, Michael Armbrust 
wrote:

> from pyspark.sql.functions import *
>
> ​
>
> df = sqlContext.range(10).select(rand().alias("a"), rand().alias("b"))
>
> df.where("a > b").show()
>
> (2) Spark Jobs
> +--+---+ | a| b|
> +--+---+
> |0.6697439215581628|0.23420961030968923| |0.9248996796756386|
> 0.4146647917936366| +--+---+
>
> On Thu, Sep 17, 2015 at 9:32 AM, Rex X  wrote:
>
>> With Pandas dataframe
>> ,
>> we can do query:
>>
>> >>> from numpy.random import randn>>> from pandas import DataFrame>>> df = 
>> >>> DataFrame(randn(10, 2), columns=list('ab'))>>> df.query('a > b')
>>
>>
>> This SQL-select-like query is very convenient. Can we do similar thing
>> with the new dataframe of spark?
>>
>>
>> Best,
>> Rex
>>
>
>


Cache after filter Vs Writing back to HDFS

2015-09-17 Thread Gavin Yue
For a large dataset, I want to filter out something and then do the
computing intensive work.

What I am doing now:

Data.filter(somerules).cache()
Data.count()

Data.map(timeintensivecompute)

But this sometimes takes unusually long time due to cache missing and
recalculation.

So I changed to this way.

Data.filter.saveasTextFile()

sc.testFile(),map(timeintesivecompute)

Second one is even faster.

How could I tune the job to reach maximum performance?

Thank you.


Distribute JMS receiver jobs on YARN

2015-09-17 Thread nibiau
Hello,
I have spark application with a JMS receiver.
Basically my application does :


JavaDStream incoming_msg = customReceiverStream.map(
new Function()
{
public String call(JMSEvent jmsEvent)
{   
return jmsEvent.getText();
}
}
);
incoming_msg.foreachRDD( new Function() {
public Void call(JavaRDD rdd) throws Exception {
rdd.foreachPartition(new VoidFunction() {
public void call(Iterator msg) throws Exception {
while (msg.hasNext()) {
// insert msg in MongoDB
}
.

It works fine in standalone, but now I want to distribute it inside a YARN 
cluster of 4 nodes.

Please, could you explain me how will/should be done the distribution on the 
cluster.
I don't understand if each node will consume JMS Queue or if master node will 
consume JMS Queue and message set will be distributed over the cluster nodes.

Please help me it is not clear..

Tks
Nicolas
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming to database exception handling

2015-09-17 Thread Cody Koeninger
If you fail the task (throw an exception) it will be retried

On Thu, Sep 17, 2015 at 4:56 PM, david w  wrote:

> I am using spark stream to receive data from kafka, and then write result
> rdd
> to external database inside foreachPartition(). All thing works fine, my
> question is how can we handle no data loss if there is database connection
> failure, or other exception happened during write data to external storage.
> Is there any way we can notify spark streaming to replay that RDD or some
> ack mechanism?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-to-database-exception-handling-tp24728.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming kafka directStream value decoder issue

2015-09-17 Thread srungarapu vamsi
If i understand correctly, i guess you are suggesting me to do this  :

val kafkaDStream  =
KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
kafkaConf, Set(topics))

kafkaDStream.map{
  case(devId,byteArray) =>(devId,KafkaGenericEvent.parseFrom(byteArray))
} foreachRDD(rdd=>rdd.collect().map{
  case(devId,genericEvent)=>{
println(genericEvent)
  }
})

I read from Kafka as a Byte Array => applied a transformation on the
byteArray to Custom Class => Printed the custom class for debugging purpose.

But this is not helping me. i.e i am getting an empty object with default
values when i printed "genericEvent"

Please correct me if i did not get what you are suggesting me to try.


On Thu, Sep 17, 2015 at 9:30 PM, Adrian Tanase  wrote:

> I guess what I'm asking is why not start with a Byte array like in the
> example that works (using the DefaultDecoder) then map over it and do the
> decoding manually like I'm suggesting below.
>
> Have you tried this approach? We have the same workflow (kafka => protobuf
> => custom class) and it works.
> If you expect invalid messages, you can use flatMap instead and wrap
> .parseFrom in a Try {} .toOption.
>
> Sent from my iPhone
>
> On 17 Sep 2015, at 18:23, srungarapu vamsi 
> wrote:
>
> @Adrian,
> I am doing collect for debugging purpose. But i have to use foreachRDD so
> that i can operate on top of this rdd and eventually save to DB.
>
> But my actual problem here is to properly convert Array[Byte] to my custom
> object.
>
> On Thu, Sep 17, 2015 at 7:04 PM, Adrian Tanase  wrote:
>
>> Why are you calling foreachRdd / collect in the first place?
>>
>> Instead of using a custom decoder, you should simply do – this is code
>> executed on the workers and allows the computation to continue. ForeachRdd
>> and collect are output operations and force the data to be collected on the
>> driver (assuming you don’t want that…)
>>
>> val events = kafkaDStream.map { case(devId,byteArray)=> 
>> KafkaGenericEvent.parseFrom(byteArray) }
>>
>> From: srungarapu vamsi
>> Date: Thursday, September 17, 2015 at 4:03 PM
>> To: user
>> Subject: Spark Streaming kafka directStream value decoder issue
>>
>> I am using KafkaUtils.createDirectStream to read the data from kafka bus.
>>
>> On the producer end, i am generating in the following way:
>>
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>> val producer = new KafkaProducer[String, KafkaGenericEvent](props)
>>
>> // Send some messages
>> println("Sending message")
>> val kafkaGenericEvent = new 
>> KafkaGenericEvent("event-id",EventType.site,"6",144050040L)
>> val message = new ProducerRecord[String, 
>> KafkaGenericEvent](topic,"myKey", kafkaGenericEvent)
>> producer.send(message)
>>   }
>>
>> I am connecting to kafka using the console consumer script and am able to
>> see proper data. The KafkaGenericEvent used in the above code is  the class
>> generated using ScalaBuff from a protobuff file.
>>
>> On the consumer end,
>> If i read the value as a normal byte array and the convert it into
>> KafkaGenericEvent in the following way, i get proper data:
>>
>>  val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>  kafkaConf, Set(topics))
>>
>> kafkaDStream.foreachRDD(rdd=>rdd.collect().map{
>>   case(devId,byteArray)=>{
>> println(KafkaGenericEvent.parseFrom(byteArray))
>>   }
>> })
>>
>> But if change the value to KafkaGenericEvent and use a custom decoder
>> like this:
>>
>> class KafkaGenericEventsDecoder(props: VerifiableProperties = null) extends 
>> Decoder[KafkaGenericEvent]{
>>  override def fromBytes(bytes:Array[Byte]):KafkaGenericEvent = {
>>KafkaGenericEvent.parseFrom(bytes)
>>  }
>> }
>>
>> and in consumer:
>>
>> val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,KafkaGenericEvent,StringDecoder,KafkaGenericEventsDecoder](ssc,
>>  kafkaConf, Set(topics))
>> kafkaDStream foreachRDD(rdd=>rdd.collect().map{
>>   case(devId,genericEvent)=>{
>> println(genericEvent)
>>   }
>> })
>>
>> Now, i my value object KafkaGenericEvent   is not created based on the
>> sent data instead it is creating an empty Object of KafkaGenericEvent with
>> default values.
>>
>> Even if i read the value as array of bytes in the createDirectStream and
>> than apply a transformation in the following way i am getting in correct
>> values:
>>
>> val kafkaDStream  = 
>> KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc,
>>  kafkaConf, Set(topics))
>>
>> 

Re: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-17 Thread Luciano Resende
You can find some more info about SparkR at
https://spark.apache.org/docs/latest/sparkr.html

Looking at your sample app, with the provided content, you should be able
to run it on SparkR with something like:

#load SparkR with support for csv
sparkR --packages com.databricks:spark-csv_2.10:1.0.3

sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

# get matrix from a file
file <- "file:///./matrix.csv"

#read it into variable
raw_data <- read.csv(file,sep=',',header=FALSE)

#convert to a local dataframe
localDF = data.frame(raw_data)

# create the rdd
rdd  <- createDataFrame(sqlContext,localDF)

printSchema(rdd)
head(rdd)

I was also trying to read the csv directly in R :
df <-  read.df(sqlContext, file, "com.databricks.spark.csv",
header="false", sep=",")

That worked, but then I was getting exceptions when i tried
printSchema(df)
head(df)

15/09/17 18:33:30 ERROR CsvRelation$: Exception while parsing line: 7,8,9.
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.spark.unsafe.types.UTF8String
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:45)
at
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getUTF8String(rows.scala:247)
at
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:49)
at
org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:247)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at
com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:150)
at
com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:130)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1843)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1843)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I will investigate this further and create a jira if necessary.

On Wed, Sep 16, 2015 at 11:22 PM, Sun, Rui  wrote:

> The existing algorithms operating on R data.frame can't simply operate on
> SparkR DataFrame. They have to be re-implemented to be based on SparkR
> DataFrame API.
>
> -Original Message-
> From: ekraffmiller [mailto:ellen.kraffmil...@gmail.com]
> Sent: Thursday, September 17, 2015 3:30 AM
> To: user@spark.apache.org
> Subject: SparkR - calling as.vector() with rdd dataframe causes error
>
> Hi,
> I have a library of clustering algorithms that I'm trying to run in the
> SparkR interactive shell. (I am working on a proof of concept for a
> document classification tool.) Each algorithm takes a term document matrix
> in the form of a dataframe.  When I pass the method a local dataframe, the
> clustering algorithm works correctly, but when I pass it a spark rdd, it
> gives an error trying to coerce the data into a vector.  Here is the code,
> that I'm calling within SparkR:
>
> # get matrix from a file
> file <-
>
> "/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"
>
> #read it 

Performance changes quite large

2015-09-17 Thread Gavin Yue
I am trying to parse quite a lot large json files.

At the beginning, I am doing like this

textFile(path).map(parseJson(line)).count()

For each file(800 - 900 Mb), it would take roughtly 1 min to finish.

I then changed the code tl

val rawData = textFile(path)
rawData.cache()
rawData.count()

rawData.map(parseJson(line)).count()

So for the first count action, it would take 2 secs for each file/task.
And for parsing, it would take another 2-4secs.

How the time could change so big, from 1min to 4-6 secs?

The problem is I do not have enough memory to cache everything. I am using
jackson json parser coming with the Spark.


Please share your advice  on this.

Thank you !


Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-17 Thread Timothy Chen
Hi Alan,

If I understand correctly, you are setting executor home when you launch the 
dispatcher and not on the configuration when you submit job, and expect it to 
inherit that configuration?

When I worked on the dispatcher I was assuming all configuration is passed to 
the dispatcher to launch the job exactly how you will need to launch it with 
client mode.

But indeed it shouldn't crash dispatcher, I'll take a closer look when I get a 
chance.

Can you recommend changes on the documentation, either in email or a PR?

Thanks!

Tim

Sent from my iPhone

> On Sep 17, 2015, at 12:29 PM, Alan Braithwaite  wrote:
> 
> Hey All,
> 
> To bump this thread once again, I'm having some trouble using the dispatcher 
> as well.
> 
> I'm using Mesos Cluster Manager with Docker Executors.  I've deployed the 
> dispatcher as Marathon job.  When I submit a job using spark submit, the 
> dispatcher writes back that the submission was successful and then promptly 
> dies in marathon.  Looking at the logs reveals it was hitting the following 
> line:
> 
> 398:  throw new SparkException("Executor Spark home 
> `spark.mesos.executor.home` is not set!")
> 
> Which is odd because it's set in multiple places (SPARK_HOME, 
> spark.mesos.executor.home, spark.home, etc).  Reading the code, it appears 
> that the driver desc pulls only from the request and disregards any other 
> properties that may be configured.  Testing by passing --conf 
> spark.mesos.executor.home=/usr/local/spark on the command line to 
> spark-submit confirms this.  We're trying to isolate the number of places 
> where we have to set properties within spark and were hoping that it will be 
> possible to have this pull in the spark-defaults.conf from somewhere, or at 
> least allow the user to inform the dispatcher through spark-submit that those 
> properties will be available once the job starts. 
> 
> Finally, I don't think the dispatcher should crash in this event.  It seems 
> not exceptional that a job is misconfigured when submitted.
> 
> Please direct me on the right path if I'm headed in the wrong direction.  
> Also let me know if I should open some tickets for these issues.
> 
> Thanks,
> - Alan
> 
>> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen  wrote:
>> Yes you can create an issue, or actually contribute a patch to update it :)
>> 
>> Sorry the docs is a bit light, I'm going to make it more complete along the 
>> way.
>> 
>> Tim
>> 
>> 
>>> On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) 
>>>  wrote:
>>> Tim,
>>> 
>>> Thank you for the explanation.  You are correct, my Mesos experience is 
>>> very light, and I haven’t deployed anything via Marathon yet.  What you 
>>> have stated here makes sense, I will look into doing this.
>>> 
>>> Adding this info to the docs would be great.  Is the appropriate action to 
>>> create an issue regarding improvement of the docs?  For those of us who are 
>>> gaining the experience having such a pointer is very helpful.
>>> 
>>> Tom
>>> 
>>> From: Tim Chen 
>>> Date: Thursday, September 10, 2015 at 10:25 AM
>>> To: Tom Waterhouse 
>>> Cc: "user@spark.apache.org" 
>>> Subject: Re: Spark on Mesos with Jobs in Cluster Mode Documentation
>>> 
>>> Hi Tom,
>>> 
>>> Sorry the documentation isn't really rich, since it's probably assuming 
>>> users understands how Mesos and framework works.
>>> 
>>> First I need explain the rationale of why create the dispatcher. If you're 
>>> not familiar with Mesos yet, each node in your datacenter is installed a 
>>> Mesos slave where it's responsible for publishing resources and 
>>> running/watching tasks, and Mesos master is responsible for taking the 
>>> aggregated resources and scheduling them among frameworks. 
>>> 
>>> Frameworks are not managed by Mesos, as Mesos master/slave doesn't launch 
>>> and maintain framework but assume they're launched and kept running on its 
>>> own. All the existing frameworks in the ecosystem therefore all have their 
>>> own ways to deploy, HA and persist state (e.g: Aurora, Marathon, etc).
>>> 
>>> Therefore, to introduce cluster mode with Mesos, we must create a framework 
>>> that is long running that can be running in your datacenter, and can handle 
>>> launching spark drivers on demand and handle HA, etc. This is what the 
>>> dispatcher is all about.
>>> 
>>> So the idea is that you should launch the dispatcher not on the client, but 
>>> on a machine in your datacenter. In Mesosphere's DCOS we launch all 
>>> frameworks and long running services with Marathon, and you can use 
>>> Marathon to launch the Spark dispatcher.
>>> 
>>> Then all clients instead of specifying the Mesos master URL (e.g: 
>>> mesos://mesos.master:2181), then just talks to the dispatcher only 
>>> (mesos://spark-dispatcher.mesos:7077), and the dispatcher will then start 
>>> and watch the driver for you.
>>> 
>>> Tim

DecisionTree hangs, then crashes

2015-09-17 Thread jluan
See my stack overflow questions for better formatted info: 
http://stackoverflow.com/questions/32621267/spark-1-5-0-hangs-running-randomforest

  

I am trying to run a basic decision tree from MLLIB. My spark version is
1.4.0. My configuration is: 
EC2 r3.4xlarge (1 master, 2 workers)
146.6 GB Total

spark.executor.memory   10m
spark.driver.memory 9m
spark.driver.maxResultSize 0
spark.storage.memoryFraction 0.6
spark.default.parallelism 64


I have loaded a test dataset of LabeledPoint values, with each LabeledPoint
containing a SparseVector features. My LabeledPoint object looks like this:
LabeledPoint(0.0, (1080963,[44673,64508,65588,122081,306819,306820,382530
...], [1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0 ...]))

Additional information on each *RDD item*:
>>> d = data.first()
>>> d.label
0.0
>>> d.features.size
1080963
>>> len(d.features.values)
2286

My *model training* is very standard:
(trainingData, testData) = data.randomSplit([0.7, 0.3])
model = RandomForest.trainClassifier(trainingData, numClasses=2,
categoricalFeaturesInfo={},
 numTrees=3,
featureSubsetStrategy="auto",
 impurity='gini', maxDepth=4,
maxBins=32)


My *Error trace* is as follows:
15/09/17 19:36:13 INFO storage.BlockManagerInfo: Removed broadcast_4_piece0
on 10.0.28.233:38432 in memory (size: 4.4 KB, free: 45.5 GB)
15/09/17 19:36:13 INFO storage.BlockManagerInfo: Removed broadcast_4_piece0
on 10.0.28.28:58416 in memory (size: 4.4 KB, free: 50.5 GB)
15/09/17 19:36:13 INFO storage.BlockManager: Removing RDD 10
15/09/17 19:36:13 INFO spark.ContextCleaner: Cleaned RDD 10
15/09/17 19:36:13 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on 10.0.28.233:38432 in memory (size: 4.2 KB, free: 45.5 GB)
15/09/17 19:36:13 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0
on 10.0.28.28:58416 in memory (size: 4.2 KB, free: 50.5 GB)
15/09/17 19:36:13 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0
on 10.0.28.233:38432 in memory (size: 3.7 KB, free: 45.5 GB)
15/09/17 19:36:13 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0
on 10.0.28.28:58416 in memory (size: 3.7 KB, free: 50.5 GB)
15/09/17 19:36:13 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0
on 10.0.28.31:56554 in memory (size: 3.7 KB, free: 50.5 GB)
15/09/17 20:33:43 INFO rdd.MapPartitionsRDD: Removing RDD 28 from
persistence list
15/09/17 20:33:43 INFO storage.BlockManager: Removing RDD 28
Traceback (most recent call last):
  File "", line 1, in 
  File "random_forest_spark.py", line 144, in trainModel
impurity='gini', maxDepth=4, maxBins=32)
  File "/root/spark/python/pyspark/mllib/tree.py", line 352, in
trainClassifier
maxDepth, maxBins, seed)
  File "/root/spark/python/pyspark/mllib/tree.py", line 270, in _train
maxDepth, maxBins, seed)
  File "/root/spark/python/pyspark/mllib/common.py", line 128, in
callMLlibFunc
return callJavaFunc(sc, api, *args)
  File "/root/spark/python/pyspark/mllib/common.py", line 121, in
callJavaFunc
return _java2py(sc, func(*args))
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o104.trainRandomForestModel.
: java.lang.OutOfMemoryError
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at

Re: Spark w/YARN Scheduling Questions...

2015-09-17 Thread Saisai Shao
Task set is a set of tasks within one stage.

Executor will be killed when it is idle for a period of time (default is
60s). The problem you mentioned is bug, scheduler should not allocate tasks
on this to-be killed executors. I think it is fixed in 1.5.

Thanks
Saisai


On Thu, Sep 17, 2015 at 3:31 PM, Robert Saccone 
wrote:

> Hello
>
>
> We're running some experiments with Spark (v1.4) and have some questions
> about its scheduling behavior.  I am hoping someone can answer the
> following questions.
>
>
> What is a task set?  It is mentioned in the Spark logs we get from our
> runs but we can't seem to find a definition and how it relates to the Spark
> concepts of Jobs, Stages, and Tasks in the online documentation.  This
> makes it hard to reason about the scheduling behavior.
>
>
> What is the heuristic used to kill executors when running Spark with YARN
> in dynamic mode?  From the logs what we observe is that executors that have
> work (task sets) queued to them are being killed and the work (task sets)
> are being reassigned to other executors.  This seems inconsistent with the
> online documentation which says that executors aren't killed until they've
> been idle for a user configurable number of seconds.
>
>
> We're using the Fair scheduler pooling with multiple pools each with
> different weights, so is it correct that there are queues in the pools and
> in the executors as well?
>
>
> We can provide more details on our setup if desired.
>
>
> Regards,
>
> Rob Saccone
>
> IBM T. J. Watson Center
>


Re: A way to timeout and terminate a laggard 'Stage' ?

2015-09-17 Thread Hemant Bhanawat
Driver timing out laggards seems like a reasonable way of handling
laggards. Are there any challenges because of which driver does not do it
today? Is there a JIRA for this? I couldn't find one.





On Tue, Sep 15, 2015 at 12:07 PM, Akhil Das 
wrote:

> As of now i think its a no. Not sure if its a naive approach, but yes you
> can have a separate program to keep an eye in the webui (possibly parsing
> the content) and make it trigger the kill task/job once it detects a lag.
> (Again you will have to figure out the correct numbers before killing any
> job)
>
> Thanks
> Best Regards
>
> On Mon, Sep 14, 2015 at 10:40 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Is there a way in Spark to automatically terminate laggard "stage's",
>> ones that appear to be hanging?   In other words, is there a timeout for
>> processing of a given RDD?
>>
>> In the Spark GUI, I see the "kill" function for a given Stage under
>> 'Details for Job <...>".
>>
>> Is there something in Spark that would identify and kill laggards
>> proactively?
>>
>> Thanks.
>>
>
>


Re: Best way to merge final output part files created by Spark job

2015-09-17 Thread MEETHU MATHEW
Try coalesce(1) before writing Thanks & Regards, Meethu M 


 On Tuesday, 15 September 2015 6:49 AM, java8964  
wrote:
   

 #yiv1620377612 #yiv1620377612 --.yiv1620377612hmmessage 
P{margin:0px;padding:0px;}#yiv1620377612 
body.yiv1620377612hmmessage{font-size:12pt;font-family:Calibri;}#yiv1620377612 
For text file, this merge works fine, but for binary format like "ORC", 
"Parquet" or "AVOR", not sure this will work.
These kind of formats in fact are not append-able, as they write the detail 
data information either in the head or at tail part of the file.
You have to use the format specified API to merge the data.
Yong

Date: Mon, 14 Sep 2015 09:10:33 +0200
Subject: Re: Best way to merge final output part files created by Spark job
From: gmu...@stratio.com
To: umesh.ka...@gmail.com
CC: user@spark.apache.org

Hi, check out  FileUtil.copyMerge function in the Hadoop API.  
It's simple,  
   
   - Get the hadoop configuration from Spark Context  FileSystem fs = 
FileSystem.get(sparkContext.hadoopConfiguration());   

   - Create new Path with destination and source directory.
   - Call copyMerge   FileUtil.copyMerge(fs, inputPath, fs, destPath, true, 
sparkContext.hadoopConfiguration(), null);

2015-09-13 23:25 GMT+02:00 unk1102 :

Hi I have a spark job which creates around 500 part files inside each
directory I process. So I have thousands of such directories. So I need to
merge these small small 500 part files. I am using
spark.sql.shuffle.partition as 500 and my final small files are ORC files.
Is there a way to merge orc files in Spark if not please suggest the best
way to merge files created by Spark job in hdfs please guide. Thanks much.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-final-output-part-files-created-by-Spark-job-tp24681.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-- 

Gaspar Muñoz 
@gmunozsoria

Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, MadridTel: +34 91 352 59 42 // @stratiobd 

  

Re: Spark on Mesos with Jobs in Cluster Mode Documentation

2015-09-17 Thread Alan Braithwaite
Hi Tim,

Thanks for the follow up.  It's not so much that I expect the executor to
inherit the configuration of the dispatcher as I* don't *expect the
dispatcher to make assumptions about the system environment of the executor
(since it lives in a docker).  I could potentially see a case where you
might want to explicitly forbid the defaults, but I can't think of any
right now.

Otherwise, I'm confused as to why the defaults in the docker image for the
executor are just ignored.  I suppose that it's the dispatchers job to
ensure the *exact* configuration of the executor, regardless of the
defaults set on the executors machine?  Is that the assumption being made?
I can understand that in contexts which aren't docker driven since jobs
could be rolling out in the middle of a config update.  Trying to think of
this outside the terms of just mesos/docker (since I'm fully aware that
docker doesn't rule the world yet).

So I can see this from both perspectives now and passing in the properties
file will probably work just fine for me, but for my better understanding:
When the executor starts, will it read any of the environment that it's
executing in or will it just take only the properties given to it by the
dispatcher and nothing more?

Lemme know if anything needs more clarification and thanks for your mesos
contribution to spark!

- Alan

On Thu, Sep 17, 2015 at 5:03 PM, Timothy Chen  wrote:

> Hi Alan,
>
> If I understand correctly, you are setting executor home when you launch
> the dispatcher and not on the configuration when you submit job, and expect
> it to inherit that configuration?
>
> When I worked on the dispatcher I was assuming all configuration is passed
> to the dispatcher to launch the job exactly how you will need to launch it
> with client mode.
>
> But indeed it shouldn't crash dispatcher, I'll take a closer look when I
> get a chance.
>
> Can you recommend changes on the documentation, either in email or a PR?
>
> Thanks!
>
> Tim
>
> Sent from my iPhone
>
> On Sep 17, 2015, at 12:29 PM, Alan Braithwaite 
> wrote:
>
> Hey All,
>
> To bump this thread once again, I'm having some trouble using the
> dispatcher as well.
>
> I'm using Mesos Cluster Manager with Docker Executors.  I've deployed the
> dispatcher as Marathon job.  When I submit a job using spark submit, the
> dispatcher writes back that the submission was successful and then promptly
> dies in marathon.  Looking at the logs reveals it was hitting the following
> line:
>
> 398:  throw new SparkException("Executor Spark home
> `spark.mesos.executor.home` is not set!")
>
> Which is odd because it's set in multiple places (SPARK_HOME,
> spark.mesos.executor.home, spark.home, etc).  Reading the code, it
> appears that the driver desc pulls only from the request and disregards any
> other properties that may be configured.  Testing by passing --conf
> spark.mesos.executor.home=/usr/local/spark on the command line to
> spark-submit confirms this.  We're trying to isolate the number of places
> where we have to set properties within spark and were hoping that it will
> be possible to have this pull in the spark-defaults.conf from somewhere, or
> at least allow the user to inform the dispatcher through spark-submit that
> those properties will be available once the job starts.
>
> Finally, I don't think the dispatcher should crash in this event.  It
> seems not exceptional that a job is misconfigured when submitted.
>
> Please direct me on the right path if I'm headed in the wrong direction.
> Also let me know if I should open some tickets for these issues.
>
> Thanks,
> - Alan
>
> On Fri, Sep 11, 2015 at 1:05 PM, Tim Chen  wrote:
>
>> Yes you can create an issue, or actually contribute a patch to update it
>> :)
>>
>> Sorry the docs is a bit light, I'm going to make it more complete along
>> the way.
>>
>> Tim
>>
>>
>> On Fri, Sep 11, 2015 at 11:11 AM, Tom Waterhouse (tomwater) <
>> tomwa...@cisco.com> wrote:
>>
>>> Tim,
>>>
>>> Thank you for the explanation.  You are correct, my Mesos experience is
>>> very light, and I haven’t deployed anything via Marathon yet.  What you
>>> have stated here makes sense, I will look into doing this.
>>>
>>> Adding this info to the docs would be great.  Is the appropriate action
>>> to create an issue regarding improvement of the docs?  For those of us who
>>> are gaining the experience having such a pointer is very helpful.
>>>
>>> Tom
>>>
>>> From: Tim Chen 
>>> Date: Thursday, September 10, 2015 at 10:25 AM
>>> To: Tom Waterhouse 
>>> Cc: "user@spark.apache.org" 
>>> Subject: Re: Spark on Mesos with Jobs in Cluster Mode Documentation
>>>
>>> Hi Tom,
>>>
>>> Sorry the documentation isn't really rich, since it's probably assuming
>>> users understands how Mesos and framework works.
>>>
>>> First I need explain the rationale of why create the dispatcher. If
>>> you're not