Is spark-env.sh sourced by Application Master and Executor for Spark on YARN?

2018-01-02 Thread John Zhuge
Hi,

I am running Spark 2.0.0 and 2.1.1 on YARN in a Hadoop 2.7.3 cluster. Is
spark-env.sh sourced when starting the Spark AM container or the executor
container?

Saw this paragraph on
https://github.com/apache/spark/blob/master/docs/configuration.md:

Note: When running Spark on YARN in cluster mode, environment variables
> need to be set using the spark.yarn.appMasterEnv.[EnvironmentVariableName] 
> property
> in your conf/spark-defaults.conf file. Environment variables that are set
> in spark-env.sh will not be reflected in the YARN Application Master
> process in clustermode. See the YARN-related Spark Properties
> 
>  for
> more information.


Does it mean spark-env.sh will not be sourced when starting AM in cluster
mode?
Does this paragraph appy to executor as well?

Thanks,
-- 
John Zhuge


Re: [Spark SQL] How to run a custom meta query for `ANALYZE TABLE`

2018-01-02 Thread Jörn Franke
Hi,

No this is not possible with the current data source API. However, there is a 
new data source API v2 on its way - maybe it will support it. 

Alternatively, you can have a config option to calculate meta data after an 
insert.

However, could you please explain more for which dB your datasource is and when 
this meta query should be executed ?

> On 3. Jan 2018, at 05:17, Jason Heo  wrote:
> 
> Hi,
> 
> I'm working on integrating Spark and a custom data source.
> 
> Most things go well with nice Spark Data Source APIs (Thanks to well designed 
> APIs)
> 
> But, one thing I couldn't resolve is that how to execute custom meta query 
> for `ANALYZE TABLE`
> 
> The custom data source I'm currently working on has a meta query so we can 
> get MIN/MAX/Cardinality without full scan.
> 
> What I want to do is that when `ANALYZE TABLE` is executed over the custom 
> data source then execute custom meta query rather than executing Full 
> Scanning.
> 
> If this is not possible, I'm considering inserting stats into metastore_db 
> manually. Is there any API exposed to handle metastore_db (e.g. insert/delete 
> meta db)?
> 
> Regards,
> 
> Jason

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark SQL] How to run a custom meta query for `ANALYZE TABLE`

2018-01-02 Thread Jason Heo
Hi,

I'm working on integrating Spark and a custom data source.

Most things go well with nice Spark Data Source APIs (Thanks to well
designed APIs)

But, one thing I couldn't resolve is that how to execute custom meta query
for `ANALYZE TABLE`

The custom data source I'm currently working on has a meta query so we can
get MIN/MAX/Cardinality without full scan.

What I want to do is that when `ANALYZE TABLE` is executed over the custom
data source then execute custom meta query rather than executing Full
Scanning.

If this is not possible, I'm considering inserting stats into metastore_db
manually. Is there any API exposed to handle metastore_db (e.g.
insert/delete meta db)?

Regards,

Jason


Unclosed NingWSCLient holds up a Spark appication

2018-01-02 Thread Lalwani, Jayesh
I noticed some weird behavior with NingWSClient  2.4.3. when used with Spark.

Try this


  1.  Spin up spark-shell with play-ws2.4.3 in driver class path
  2.  Run this code
val myConfig = new AsyncHttpClientConfigBean()

config.setAcceptAnyCertificate(true)

config.setFollowRedirect(true)

val oneMinute = 6

config.setMaxConnectionLifeTime(oneMinute)

config.setConnectionTimeOut(oneMinute)

config.setIdleConnectionInPoolTimeout(oneMinute)

config.setReadTimeout(oneMinute)

config.setRequestTimeout(oneMinute)

config.setSslSessionTimeout(oneMinute)

val client = NingWSClient(config)

  1.  Quit spark-shell
  2.  Spark-shell won’t quit. It will just sit there
  3.  Open a new spark shell, repeat the above code. And then  client.close
  4.  Quit spark-shell
  5.  Spark-shell quits normally

In context of a spark-application, the spark application won’t quit even though 
the jobs are complete and your main method has exited. It will sit there with 
the executors holding on to the cores. Looking at the thread dump, it seems 
like NingWSCLient starts a few threads, and the threads don’t die even after 
the spark application’s main method quits. Calling client.close cleans up the 
background threads and the application can exit normally.

Has anyone seen this before?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Converting binary files

2018-01-02 Thread Lalwani, Jayesh
You can repartition your dataframe into 1 partition and all the data will land 
into one partition. However, doing this is perilious because you will end up 
with all your data on one node, and if you have too much data you will run out 
of memory. In fact, anytime you are thinking about putting data in a single 
file, you should ask yourself “Does this data fit into memory?”

The reason why Spark is geared towards reading and writing data in a 
partitioned manner is because fundamentally, partitioning data is how you scale 
your applications. Partitioned data allows Spark (or really any application 
that is designed to scale on a cluster) to read data in parallel, process it 
and spit out, without any bottlenecking. Humans prefer all their data in a 
single file/table, because humans have a limited ability of keeping track of 
multitude of files. Grid enabled software hate single files, simply because 
there is no good way for 2 nodes to read a large file without having some sort 
of bottlenecking

Imagine a data processing pipeline that starts with some sort of ingestion and 
transformation at one end, which feeds into several analytical processes. 
Usually there are humans at the end who are looking at the results of the 
analytics.  These humans love to get their analytics in a dashboard that gives 
them a high-level view of the data. However, all the data processing systems 
that go from input to analytics, prefer their data to be cut up into bite sized 
chunks

From: Christopher Piggott 
Date: Saturday, December 30, 2017 at 3:45 PM
To: "user@spark.apache.org" 
Subject: Converting binary files

I have been searching for examples, but not finding exactly what I need.

I am looking for the paradigm for using spark 2.2 to convert a bunch of binary 
files into a bunch of different binary files.  I'm starting with:

   val files = 
spark.sparkContext.binaryFiles("hdfs://1.2.3.4/input")

then convert them:

   val converted = files.map {   case (filename, content) =>   ( filename -> 
convert(content) }

but I don't really want to save by 'partition', I want to save the file using 
the original name but in a different directory.e.g. "converted/*"

I'm not quite sure how I'm supposed to do this within the framework of what's 
available to me in SparkContext.  Do I need to do it myself using the HDFS api?

It would seem like this would be a pretty normal thing to do.  Imagine for 
instance I were saying take a bunch of binary files and compress them, and save 
the compressed output to a different directory.  I feel like I'm missing 
something fundamental here.

--C





The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Current way of using functions.window with Java

2018-01-02 Thread Anton Puzanov
I write a sliding window analytic program and use the functions.window
function (
https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/functions.html#window(org.apache.spark.sql.Column,%20java.lang.String,%20java.lang.String)
)
The code looks like this:

Column slidingWindow = functions.window(myDF.col("timestamp"), "24
hours", "1 seconds");
Dataset finalRes = myDF.groupBy(slidingWindow,
myDF.col("user")).agg(functions.collect_set("purchase").as("purchases"));


As you can see in this usecase I have small steps and large window.
A code with same flavor caused the following error (which in my
understanding is related to the creation of the Java code generation):

Caused by: org.spark_project.guava.util.concurrent.ExecutionError:
java.lang.OutOfMemoryError: Java heap space
at 
org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2261)
at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at 
org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:890)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:357)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at 
org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:85)
at 
org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:121)
at 
org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:112)
at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 77 more
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.resize(HashMap.java:703)
at java.util.HashMap.putVal(HashMap.java:628)
at java.util.HashMap.putMapEntries(HashMap.java:514)
at java.util.HashMap.putAll(HashMap.java:784)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3073)
at org.codehaus.janino.UnitCompiler.access$4900(UnitCompiler.java:206)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2958)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2926)
at 
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:2974)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3033)
at org.codehaus.janino.UnitCompiler.access$4400(UnitCompiler.java:206)
at 
org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2950)
at 
org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2926)
at org.codehaus.janino.Java$SwitchStatement.accept(Java.java:2866)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982)
at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2926)
at org.codehaus.janino.Java$Block.accept(Java.java:2471)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2999)
at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:206)
at 
org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2946)
at 
org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2926)
at org.codehaus.janino.Java$ForStatement.accept(Java.java:2660)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982)
at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944)

When I run the code with a really small data and window=3 minutes,
step= 1 seconds I get this error:
there are tens of thousands of lines like:
/* 30761 */   if 

Re: Spark on EMR suddenly stalling

2018-01-02 Thread Gourav Sengupta
Hi Jeroen,

in case you are using HIVE partitions how many partitions do you have?

Also is there any chance that you might post the code?

Regards,
Gourav Sengupta

On Tue, Jan 2, 2018 at 7:50 AM, Jeroen Miller 
wrote:

> Hello Gourav,
>
> On 30 Dec 2017, at 20:20, Gourav Sengupta 
> wrote:
> > Please try to use the SPARK UI from the way that AWS EMR recommends, it
> should be available from the resource manager. I never ever had any problem
> working with it. THAT HAS ALWAYS BEEN MY PRIMARY AND SOLE SOURCE OF
> DEBUGGING.
>
> For some reason sometimes there is absolutely nothing showing up in the
> Spark UI or the UI is not refreshed, e.g. for the current stage is #x while
> the logs shows stage #y (with y > x) is currently under way.
>
> It may very well be that the source of this problem lies between the
> keyboard and the chair, but if this is the case, I do not know how to solve
> this.
>
> > Also, I ALWAYS prefer the maximize Resource Allocation setting in EMR to
> be set to true.
>
> Thanks for the tip -- will try this setting in my next batch of
> experiments!
>
> JM
>
>