Re: Does the delegator map task of SparkLauncher need to stay alive until Spark job finishes ?

2016-10-22 Thread Elkhan Dadashov
I found answer regarding logging in the JavaDoc of SparkLauncher:

"Currently, all applications are launched as child processes. The child's
stdout and stderr are merged and written to a logger (see
java.util.logging)."

One last question. sparkAppHandle.getAppId() - does this function
return org.apache.hadoop.mapred.*JobID* which makes it easy tracking in
Yarn ? Or is appId just the Spark app name we assign ?

If it is JobID, then even if the SparkLauncher handle goes away, by talking
directly to the cluster manager, i can get Job details.

Thanks.

On Sat, Oct 22, 2016 at 4:53 PM Elkhan Dadashov 
wrote:

> Thanks, Marcelo.
>
> One more question regarding getting logs.
>
> In previous implementation of SparkLauncer we could read logs from :
>
> sparkLauncher.getInputStream()
> sparkLauncher.getErrorStream()
>
> What is the recommended way of getting logs and logging of Spark execution
> while using sparkLauncer#startApplication() ?
>
> Thanks.
>
> On Tue, Oct 18, 2016 at 3:07 PM Marcelo Vanzin 
> wrote:
>
> On Tue, Oct 18, 2016 at 3:01 PM, Elkhan Dadashov 
> wrote:
> > Does my map task need to wait until Spark job finishes ?
>
> No...
>
> > Or is there any way, my map task finishes after launching Spark job, and
> I
> > can still query and get status of Spark job outside of map task (or
> failure
> > reason, if it has failed) ? (maybe by querying Spark job id ?)
>
> ...but if the SparkLauncher handle goes away, then you lose the
> ability to track the app's state, unless you talk directly to the
> cluster manager.
>
> > I guess also if i want my Spark job to be killed, if corresponding
> delegator
> > map task is killed, that means my map task needs to stay alive, so i
> still
> > have SparkAppHandle reference ?
>
> Correct, unless you talk directly to the cluster manager.
>
> --
> Marcelo
>
>


Re: Does the delegator map task of SparkLauncher need to stay alive until Spark job finishes ?

2016-10-22 Thread Elkhan Dadashov
Thanks, Marcelo.

One more question regarding getting logs.

In previous implementation of SparkLauncer we could read logs from :

sparkLauncher.getInputStream()
sparkLauncher.getErrorStream()

What is the recommended way of getting logs and logging of Spark execution
while using sparkLauncer#startApplication() ?

Thanks.

On Tue, Oct 18, 2016 at 3:07 PM Marcelo Vanzin  wrote:

> On Tue, Oct 18, 2016 at 3:01 PM, Elkhan Dadashov 
> wrote:
> > Does my map task need to wait until Spark job finishes ?
>
> No...
>
> > Or is there any way, my map task finishes after launching Spark job, and
> I
> > can still query and get status of Spark job outside of map task (or
> failure
> > reason, if it has failed) ? (maybe by querying Spark job id ?)
>
> ...but if the SparkLauncher handle goes away, then you lose the
> ability to track the app's state, unless you talk directly to the
> cluster manager.
>
> > I guess also if i want my Spark job to be killed, if corresponding
> delegator
> > map task is killed, that means my map task needs to stay alive, so i
> still
> > have SparkAppHandle reference ?
>
> Correct, unless you talk directly to the cluster manager.
>
> --
> Marcelo
>


Re: [Spark 2.0.0] error when unioning to an empty dataset

2016-10-22 Thread Efe Selcuk
Ah, looks similar. Next opportunity I get, I'm going to do a printSchema on
the two datasets and see if they don't match up.

I assume that unioning the underlying RDDs doesn't run into this problem
because of less type checking or something along those lines?

On Fri, Oct 21, 2016 at 3:39 PM Cheng Lian  wrote:

> Efe - You probably hit this bug:
> https://issues.apache.org/jira/browse/SPARK-18058
>
> On 10/21/16 2:03 AM, Agraj Mangal wrote:
>
> I have seen this error sometimes when the elements in the schema have
> different nullabilities. Could you print the schema for data and for
> someCode.thatReturnsADataset() and see if there is any difference between
> the two ?
>
> On Fri, Oct 21, 2016 at 9:14 AM, Efe Selcuk  wrote:
>
> Thanks for the response. What do you mean by "semantically" the same?
> They're both Datasets of the same type, which is a case class, so I would
> expect compile-time integrity of the data. Is there a situation where this
> wouldn't be the case?
>
> Interestingly enough, if I instead create an empty rdd with
> sparkContext.emptyRDD of the same case class type, it works!
>
> So something like:
> var data = spark.sparkContext.emptyRDD[SomeData]
>
> // loop
>   data = data.union(someCode.thatReturnsADataset().rdd)
> // end loop
>
> data.toDS //so I can union it to the actual Dataset I have elsewhere
>
> On Thu, Oct 20, 2016 at 8:34 PM Agraj Mangal  wrote:
>
> I believe this normally comes when Spark is unable to perform union due to
> "difference" in schema of the operands. Can you check if the schema of both
> the datasets are semantically same ?
>
> On Tue, Oct 18, 2016 at 9:06 AM, Efe Selcuk  wrote:
>
> Bump!
>
> On Thu, Oct 13, 2016 at 8:25 PM Efe Selcuk  wrote:
>
> I have a use case where I want to build a dataset based off of
> conditionally available data. I thought I'd do something like this:
>
> case class SomeData( ... ) // parameters are basic encodable types like
> strings and BigDecimals
>
> var data = spark.emptyDataset[SomeData]
>
> // loop, determining what data to ingest and process into datasets
>   data = data.union(someCode.thatReturnsADataset)
> // end loop
>
> However I get a runtime exception:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> unresolved operator 'Union;
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
> at org.apache.spark.sql.Dataset.(Dataset.scala:161)
> at org.apache.spark.sql.Dataset.(Dataset.scala:167)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
> at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
> at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)
>
> Granted, I'm new at Spark so this might be an anti-pattern, so I'm open to
> suggestions. However it doesn't seem like I'm doing anything incorrect
> here, the types are correct. Searching for this error online returns
> results seemingly about working in dataframes and having mismatching
> schemas or a different order of fields, and it seems like bugfixes have
> gone into place for those cases.
>
> Thanks in advance.
> Efe
>
>
>
>
> --
> Thanks & Regards,
> Agraj Mangal
>
>
>
>
> --
> Thanks & Regards,
> Agraj Mangal
>
>
>


Re: RDD groupBy() then random sort each group ?

2016-10-22 Thread Koert Kuipers
groupBy always materializes the entire group (on disk or in memory) which
is why you should avoid it for large groups.

The key is to never materialize the grouped and shuffled data.

To see one approach to do this take a look at
https://github.com/tresata/spark-sorted

It's basically a combination of smart partitioning and secondary sort.

On Oct 20, 2016 1:55 PM, "Yang"  wrote:

> in my application, I group by same training samples by their model_id's
>  (the input table contains training samples for 100k different models),
> then each group ends up having about 1 million training samples,
>
> then I feed that group of samples to a little Logistic Regression solver
> (SGD), but SGD requires the input data to be shuffled randomly (so that
> positive and negative samples are evenly distributed), so now I do
> something like
>
> my_input_rdd.groupBy(x=>x.model_id).map(x=>
> val (model_id, group_of_rows) = x
>
>  (model_id, group_of_rows.toSeq().shuffle() )
>
> ).map(x=> (x._1, train_sgd(x._2))
>
>
> the issue is that on the 3rd row above, I had to explicitly call toSeq()
> on the group_of_rows in order to shuffle, which is an Iterable and not Seq.
> now I have to load the entire 1 million rows into memory, and in practice
> I've seen my tasks OOM and GC time goes crazy (about 50% of total run
> time). I suspect this toSeq() is the reason, since doing a simple count()
> on the groupBy() result works fine.
>
> I am planning to shuffle the my_input_rdd first, and then groupBy(), and
> not do the toSeq().shuffle(). intuitively the input rdd is already
> shuffled, so UNLESS groupBy() tries to do some sorting, the rows in the
> group SHOULD remain shuffled  but overall this remains rather flimsy.
>
> any ideas to do this more reliably?
>
> thanks!
>
>


Re: Issues with reading gz files with Spark Streaming

2016-10-22 Thread Nkechi Achara
I do not use rename, and the files are written to, and then moved to a
directory on HDFS in gz format.

On 22 October 2016 at 15:14, Steve Loughran  wrote:

>
> > On 21 Oct 2016, at 15:53, Nkechi Achara  wrote:
> >
> > Hi,
> >
> > I am using Spark 1.5.0 to read gz files with textFileStream, but when
> new files are dropped in the specified directory. I know this is only the
> case with gz files as when i extract the file into the directory specified
> the files are read on the next window and processed.
> >
> > My code is here:
> >
> > val comments = ssc.fileStream[LongWritable, Text,
> TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
> >   map(pair => pair._2.toString)
> > comments.foreachRDD(i => i.foreach(m=> println(m)))
> >
> > any idea why the gz files are not being recognized.
> >
> > Thanks in advance,
> >
> > K
>
> Are the files being written in the directory or renamed in? As you should
> be using rename() against a filesystem (not an object store) to make sure
> that the file isn't picked up
>


Re: How does Spark determine in-memory partition count when reading Parquet ~files?

2016-10-22 Thread shea.parkes
Thank you for the reply tosaurabh85.  We do tune and adjust our shuffle
partition count, but that was not influencing the reading of parquets (the
data is not shuffled as it is read as I understand it).

I apologize that I actually received an answer, but it was not caught on the
mailing list here.  I'm posting the thread here below for future people to
find the answer as well:



On Wed, Oct 19, 2016 at 9:33 PM Michael Armbrust  wrote:
In spark 2.0 we bin-pack small files into a single task to avoid overloading
the scheduler.  If you want a specific number of partitions you should
repartition.  If you want to disable this optimization you can set the file
open cost very high: spark.sql.files.openCostInBytes

My reply:

Thank you very much for that information sir.  It does make sense, I just
did not find that in any release notes.  I will work to tune that parameter
appropriately for our work flow.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-Spark-determine-in-memory-partition-count-when-reading-Parquet-files-tp27921p27943.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing to Parquet Job turns to wait mode after even completion of job

2016-10-22 Thread Steve Loughran

On 22 Oct 2016, at 00:48, Chetan Khatri 
> wrote:

Hello Cheng,

Thank you for response.

I am using spark 1.6.1, i am writing around 350 gz parquet part files for 
single table. Processed around 180 GB of Data using Spark.

Are you writing to GCS storage to to the local HDD?

Regarding options to set, for performance reads against object store hosted 
parquet data, also go for

spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false






On Sat, Oct 22, 2016 at 3:41 AM, Cheng Lian 
> wrote:

What version of Spark are you using and how many output files does the job 
writes out?

By default, Spark versions before 1.6 (not including) writes Parquet summary 
files when committing the job. This process reads footers from all Parquet 
files in the destination directory and merges them together. This can be 
particularly bad if you are appending a small amount of data to a large 
existing Parquet dataset.

If that's the case, you may disable Parquet summary files by setting Hadoop 
configuration " parquet.enable.summary-metadata" to false.


Now I'm a bit mixed up. Should that be 
spark.sql.parquet.enable.summary-metadata =false?


We've disabled it by default since 1.6.0

Cheng

On 10/21/16 1:47 PM, Chetan Khatri wrote:
Hello Spark Users,

I am writing around 10 GB of Processed Data to Parquet where having 1 TB of HDD 
and 102 GB of RAM, 16 vCore machine on Google Cloud.

Every time, i write to parquet. it shows on Spark UI that stages succeeded but 
on spark shell it hold context on wait mode for almost 10 mins. then it clears 
broadcast, accumulator shared variables.

Can we sped up this thing ?

Thanks.

--
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee. The information may also be legally 
privileged. This transmission is sent in trust, for the sole purpose of 
delivery to the intended recipient. If you have received this transmission in 
error, any use, reproduction or dissemination of this transmission is strictly 
prohibited. If you are not the intended recipient, please immediately notify 
the sender by reply e-mail or phone and delete this message and its 
attachments, if any.​​




--
Yours Aye,
Chetan Khatri.
M.+91 7 80574
Data Science Researcher
INDIA

​​Statement of Confidentiality

The contents of this e-mail message and any attachments are confidential and 
are intended solely for addressee. The information may also be legally 
privileged. This transmission is sent in trust, for the sole purpose of 
delivery to the intended recipient. If you have received this transmission in 
error, any use, reproduction or dissemination of this transmission is strictly 
prohibited. If you are not the intended recipient, please immediately notify 
the sender by reply e-mail or phone and delete this message and its 
attachments, if any.​​



Re: Issues with reading gz files with Spark Streaming

2016-10-22 Thread Steve Loughran

> On 21 Oct 2016, at 15:53, Nkechi Achara  wrote:
> 
> Hi, 
> 
> I am using Spark 1.5.0 to read gz files with textFileStream, but when new 
> files are dropped in the specified directory. I know this is only the case 
> with gz files as when i extract the file into the directory specified the 
> files are read on the next window and processed.
> 
> My code is here:
> 
> val comments = ssc.fileStream[LongWritable, Text, 
> TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
>   map(pair => pair._2.toString)
> comments.foreachRDD(i => i.foreach(m=> println(m)))
> 
> any idea why the gz files are not being recognized.
> 
> Thanks in advance,
> 
> K

Are the files being written in the directory or renamed in? As you should be 
using rename() against a filesystem (not an object store) to make sure that the 
file isn't picked up

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



why is that two stages in apache spark are computing same thing?

2016-10-22 Thread maitraythaker
I have a spark optimization query that I have posted on StackOverflow, any
guidance on this would be appreciated.
Please follow the link below, I have explained the problem in depth here
with code.
http://stackoverflow.com/questions/40192302/why-is-that-two-stages-in-apache-spark-are-computing-same-thing

  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-is-that-two-stages-in-apache-spark-are-computing-same-thing-tp27942.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Spark optimization problem

2016-10-22 Thread Maitray Thaker
Hi,
I have a query regarding spark stage optimization. I have asked the
question in more detail at Stackoverflow, please find the following link:
http://stackoverflow.com/questions/40192302/why-is-
that-two-stages-in-apache-spark-are-computing-same-thing