Re: Dynamic metric names

2019-05-07 Thread Roberto Coluccio
It would be a dream to have an easy-to-use dynamic metric system AND a
reliable counting system (accumulator-like) in Spark...

Thanks
Roberto

On Tue, May 7, 2019 at 3:54 AM Saisai Shao  wrote:

> I think the main reason why that was not merged is that Spark itself
> doesn't have such requirement, and the metrics system is mainly used for
> spark itself. Most of the needs are from the custom sources/sinks, but
> Spark's MetricsSystem is not designed as a public API.
>
> I think we could revisit or improve that PR if there's a solid reason
> about it.
>
> Thanks
> Saisai
>
> Sergey Zhemzhitsky  于2019年5月7日周二 上午5:49写道:
>
>> Hi Saisai,
>>
>> Thanks a lot for the link! This is exactly what I need.
>> Just curious, why this PR has not been merged, as it seems to implement
>> rather natural requirement.
>>
>> There are a number or use cases which can benefit from this feature, e.g.
>> - collecting business metrics based on the data's attributes and
>> reporting them into the monitoring system as a side effect of the data
>> processing
>> - visualizing technical metrics by means of alternative software (e.g.
>> grafana) - currently it's hardly possible to know the actual number of
>> jobs, stages, tasks and their names and IDs in advance to register all the
>> corresponding metrics statically.
>>
>>
>> Kind Regards,
>> Sergey
>>
>>
>> On Mon, May 6, 2019, 16:07 Saisai Shao  wrote:
>>
>>> I remembered there was a PR about doing similar thing (
>>> https://github.com/apache/spark/pull/18406). From my understanding,
>>> this seems like a quite specific requirement, it may requires code change
>>> to support your needs.
>>>
>>> Thanks
>>> Saisai
>>>
>>> Sergey Zhemzhitsky  于2019年5月4日周六 下午4:44写道:
>>>
 Hello Spark Users!

 Just wondering whether it is possible to register a metric source
 without metrics known in advance and add the metrics themselves to this
 source later on?

 It seems that currently MetricSystem puts all the metrics from the
 source's MetricRegistry into a shared MetricRegistry of a MetricSystem
 during metric source registration [1].

 So in case there is a new metric with a new name added to the source's
 registry after this source registration, then this new metric will not be
 reported to the sinks.

 What I'd like to achieve is to be able to register new metrics with new
 names dynamically using a single metric source.
 Is it somehow possible?


 [1]
 https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162

>>>


userClassPathFirst=true prevents SparkContext to be initialized

2017-01-30 Thread Roberto Coluccio
Hello folks,

I'm trying to work around an issue with some dependencies by trying to
specify at spark-submit time that I want my (user) classpath to be resolved
and taken into account first (against the jars received through the System
Classpath, which is /data/cloudera/parcels/CDH/jars/).

In order to accomplish this, I specify

--conf spark.driver.userClassPathFirst=true
--conf spark.executor.userClassPathFirst=true

and I pass my jars with

--jars 

in my spark-submit command, deploying in yarn cluster mode in a CDH 5.8
environment (Spark 1.6).

In the list passed with --jars I have severals deps, NOT including
hadoop/spark related ones. My app jar is not a fat (uber) one, thus it
includes only business classes. None of these ones has for any reasons a
"SparkConf.set("master", "local")", or anything like that.

Without specifying the userClassPathFirst configuration, my App is launched
and completed with no issues at all.

I tried to print logs down to the TRACE level with no luck. I get no
explicit errors and I verified adding the "-verbose:class" JVM arg that
Spark-related classes seem to be loaded with no issues. From a rapid
overview of loaded classes, it seems to me that a small fraction of classes
is loaded using userClassPathFirst=true w/r/t the default case. Eventually,
my driver's stderr gets stuck in logging out:

2017-01-30 10:10:22,308 INFO  ApplicationMaster:58 - Waiting for spark
context initialization ...
2017-01-30 10:10:32,310 INFO  ApplicationMaster:58 - Waiting for spark
context initialization ...
2017-01-30 10:10:42,311 INFO  ApplicationMaster:58 - Waiting for spark
context initialization ...

Dramatically, the application is then killed by YARN after a timeout.

In my understanding, quoting the doc (
http://spark.apache.org/docs/1.6.2/configuration.html):

[image: Inline image 1]

So I would expect the libs given through --jars options to be used first,
but I also expect no issues in loading the system classpath afterwards.
This is confirmed by the logs printed with the "-verbose:class" JVM option,
where I can see logs like:

[Loaded org.apache.spark.SparkContext from
file:/data/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/jars/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar]


What am I missing here guys?

Thanks for your help.

Best regards,

Roberto


Re: [Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers

2016-02-23 Thread Roberto Coluccio
Any chance anyone gave a look at this?

Thanks!

On Wed, Feb 10, 2016 at 10:46 AM, Roberto Coluccio <
roberto.coluc...@gmail.com> wrote:

> Thanks Shixiong!
>
> I'm attaching the thread dumps (I printed the Spark UI after expanding all
> the elements, hope that's fine) and related stderr (INFO level) executors
> logs. There are 3 of them. Thread dumps have been collected at the time the
> StreamingContext was (trying to) shutdown, i.e. when I saw the following
> logs in driver's stderr:
>
> 16/02/10 15:46:25 INFO ApplicationMaster: Final app status: SUCCEEDED, 
> exitCode: 0
> 16/02/10 15:46:25 INFO StreamingContext: Invoking stop(stopGracefully=true) 
> from shutdown hook
> 16/02/10 15:46:25 INFO ReceiverTracker: Sent stop signal to all 3 receivers
> 16/02/10 15:46:35 INFO ReceiverTracker: Waiting for receiver job to terminate 
> gracefully
>
>
> Then, from 15:50 ongoing, the driver started again to report logs as it
> was continuing to process as usual. You might find some exceptions in
> executors logs that have right the 15:50 timestamp.
>
> Thanks you very much in advance!
>
> Roberto
>
>
>
> On Tue, Feb 9, 2016 at 6:25 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Could you do a thread dump in the executor that runs the Kinesis receiver
>> and post it? It would be great if you can provide the executor log as well?
>>
>> On Tue, Feb 9, 2016 at 3:14 PM, Roberto Coluccio <
>> roberto.coluc...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> can anybody kindly help me out a little bit here? I just verified the
>>> problem is still there on Spark 1.6.0 and emr-4.3.0 as well. It's
>>> definitely a Kinesis-related issue, since with Spark 1.6.0 I'm successfully
>>> able to get Streaming drivers to terminate with no issue IF I don't use
>>> Kinesis and open any Receivers.
>>>
>>> Thank you!
>>>
>>> Roberto
>>>
>>>
>>> On Tue, Feb 2, 2016 at 4:40 PM, Roberto Coluccio <
>>> roberto.coluc...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm struggling around an issue ever since I tried to upgrade my Spark
>>>> Streaming solution from 1.4.1 to 1.5+.
>>>>
>>>> I have a Spark Streaming app which creates 3 ReceiverInputDStreams
>>>> leveraging KinesisUtils.createStream API.
>>>>
>>>> I used to leverage a timeout to terminate my app
>>>> (StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf
>>>> spark.streaming.stopGracefullyOnShutdown=true).
>>>>
>>>> I used to submit my Spark app on EMR in yarn-cluster mode.
>>>>
>>>> Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9).
>>>>
>>>> Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0
>>>> on emr-4.3.0) I can't get the app to actually terminate. Logs tells me it
>>>> tries to, but no confirmation of receivers stop is retrieved. Instead, when
>>>> the timer gets to the next period, the StreamingContext continues its
>>>> processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem
>>>> and pmem killls disabled).
>>>>
>>>> ...
>>>>
>>>> 16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED, 
>>>> exitCode: 0
>>>> 16/02/02 21:22:08 INFO StreamingContext: Invoking 
>>>> stop(stopGracefully=true) from shutdown hook
>>>> 16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers
>>>> 16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to 
>>>> terminate gracefully
>>>> 16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141
>>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>>>> 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB)
>>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>>>> ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free: 1224.9 
>>>> MB)
>>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>>>> ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free: 1224.0 
>>>> MB)
>>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>>>> ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free: 1224.9 
>>>> MB)
>>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piec

Re: [Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers

2016-02-09 Thread Roberto Coluccio
Hello,

can anybody kindly help me out a little bit here? I just verified the
problem is still there on Spark 1.6.0 and emr-4.3.0 as well. It's
definitely a Kinesis-related issue, since with Spark 1.6.0 I'm successfully
able to get Streaming drivers to terminate with no issue IF I don't use
Kinesis and open any Receivers.

Thank you!

Roberto


On Tue, Feb 2, 2016 at 4:40 PM, Roberto Coluccio <roberto.coluc...@gmail.com
> wrote:

> Hi,
>
> I'm struggling around an issue ever since I tried to upgrade my Spark
> Streaming solution from 1.4.1 to 1.5+.
>
> I have a Spark Streaming app which creates 3 ReceiverInputDStreams
> leveraging KinesisUtils.createStream API.
>
> I used to leverage a timeout to terminate my app
> (StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf
> spark.streaming.stopGracefullyOnShutdown=true).
>
> I used to submit my Spark app on EMR in yarn-cluster mode.
>
> Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9).
>
> Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0 on
> emr-4.3.0) I can't get the app to actually terminate. Logs tells me it
> tries to, but no confirmation of receivers stop is retrieved. Instead, when
> the timer gets to the next period, the StreamingContext continues its
> processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem
> and pmem killls disabled).
>
> ...
>
> 16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED, 
> exitCode: 0
> 16/02/02 21:22:08 INFO StreamingContext: Invoking stop(stopGracefully=true) 
> from shutdown hook
> 16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers
> 16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to terminate 
> gracefully
> 16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
> 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB)
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
> ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free: 1224.9 MB)
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
> ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free: 1224.0 MB)
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
> ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free: 1224.9 MB)
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
> ip-172-31-3-140.ec2.internal:50542 in memory (size: 23.9 KB, free: 1224.7 MB)
> 16/02/02 21:22:52 INFO ContextCleaner: Cleaned accumulator 184
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
> 172.31.3.140:50152 in memory (size: 3.0 KB, free: 2.1 GB)
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
> ip-172-31-3-141.ec2.internal:41776 in memory (size: 3.0 KB, free: 1224.9 MB)
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
> ip-172-31-3-141.ec2.internal:56428 in memory (size: 3.0 KB, free: 1224.9 MB)
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
> ip-172-31-3-140.ec2.internal:36295 in memory (size: 3.0 KB, free: 1224.0 MB)
> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
> ip-172-31-3-140.ec2.internal:50542 in memory (size: 3.0 KB, free: 1224.7 MB)
> 16/02/02 21:25:00 INFO StateDStream: Marking RDD 680 for time 145444830 
> ms for checkpointing
> 16/02/02 21:25:00 INFO StateDStream: Marking RDD 708 for time 145444830 
> ms for checkpointing
> 16/02/02 21:25:00 INFO TransformedDStream: Slicing from 145444800 ms to 
> 145444830 ms (aligned to 145444800 ms and 145444830 ms)
> 16/02/02 21:25:00 INFO StateDStream: Marking RDD 777 for time 145444830 
> ms for checkpointing
> 16/02/02 21:25:00 INFO StateDStream: Marking RDD 801 for time 145444830 
> ms for checkpointing
> 16/02/02 21:25:00 INFO JobScheduler: Added jobs for time 145444830 ms
> 16/02/02 21:25:00 INFO JobGenerator: Checkpointing graph for time 
> 145444830 ms
> 16/02/02 21:25:00 INFO DStreamGraph: Updating checkpoint data for time 
> 145444830 ms
> 16/02/02 21:25:00 INFO JobScheduler: Starting job streaming job 145444830 
> ms.0 from job set of time 145444830 ms
>
> ...
>
>
> Please, this is really blocking in the upgrade process to latest Spark
> versions and I really don't know how to work it around.
>
> Any help would be very much appreciated.
>
> Thank you,
>
> Roberto
>
>
>


[Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers

2016-02-02 Thread Roberto Coluccio
Hi,

I'm struggling around an issue ever since I tried to upgrade my Spark
Streaming solution from 1.4.1 to 1.5+.

I have a Spark Streaming app which creates 3 ReceiverInputDStreams
leveraging KinesisUtils.createStream API.

I used to leverage a timeout to terminate my app
(StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf
spark.streaming.stopGracefullyOnShutdown=true).

I used to submit my Spark app on EMR in yarn-cluster mode.

Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9).

Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0 on
emr-4.3.0) I can't get the app to actually terminate. Logs tells me it
tries to, but no confirmation of receivers stop is retrieved. Instead, when
the timer gets to the next period, the StreamingContext continues its
processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem
and pmem killls disabled).

...

16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED,
exitCode: 0
16/02/02 21:22:08 INFO StreamingContext: Invoking
stop(stopGracefully=true) from shutdown hook
16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers
16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to
terminate gracefully
16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free:
1224.9 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free:
1224.0 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free:
1224.9 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0
on ip-172-31-3-140.ec2.internal:50542 in memory (size: 23.9 KB, free:
1224.7 MB)
16/02/02 21:22:52 INFO ContextCleaner: Cleaned accumulator 184
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on 172.31.3.140:50152 in memory (size: 3.0 KB, free: 2.1 GB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on ip-172-31-3-141.ec2.internal:41776 in memory (size: 3.0 KB, free:
1224.9 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on ip-172-31-3-141.ec2.internal:56428 in memory (size: 3.0 KB, free:
1224.9 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on ip-172-31-3-140.ec2.internal:36295 in memory (size: 3.0 KB, free:
1224.0 MB)
16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0
on ip-172-31-3-140.ec2.internal:50542 in memory (size: 3.0 KB, free:
1224.7 MB)
16/02/02 21:25:00 INFO StateDStream: Marking RDD 680 for time
145444830 ms for checkpointing
16/02/02 21:25:00 INFO StateDStream: Marking RDD 708 for time
145444830 ms for checkpointing
16/02/02 21:25:00 INFO TransformedDStream: Slicing from 145444800
ms to 145444830 ms (aligned to 145444800 ms and 145444830
ms)
16/02/02 21:25:00 INFO StateDStream: Marking RDD 777 for time
145444830 ms for checkpointing
16/02/02 21:25:00 INFO StateDStream: Marking RDD 801 for time
145444830 ms for checkpointing
16/02/02 21:25:00 INFO JobScheduler: Added jobs for time 145444830 ms
16/02/02 21:25:00 INFO JobGenerator: Checkpointing graph for time
145444830 ms
16/02/02 21:25:00 INFO DStreamGraph: Updating checkpoint data for time
145444830 ms
16/02/02 21:25:00 INFO JobScheduler: Starting job streaming job
145444830 ms.0 from job set of time 145444830 ms

...


Please, this is really blocking in the upgrade process to latest Spark
versions and I really don't know how to work it around.

Any help would be very much appreciated.

Thank you,

Roberto


Spark 1.5.2 streaming driver in YARN cluster mode on Hadoop 2.6 (on EMR 4.2) restarts after stop

2016-01-14 Thread Roberto Coluccio
Hi there,

I'm facing a weird issue when upgrading from Spark 1.4.1 streaming driver
on EMR 3.9 (hence Hadoop 2.4.0) to Spark 1.5.2 on EMR 4.2 (hence Hadoop
2.6.0).

Basically, the very same driver which used to terminate after a timeout as
expected, now does not. In particular, as long as the driver's logs could
tell me, the StreamingContext seems to be stopped with success (and exit
code 0), but the Hadoop/YARN job does not terminate/complete. Instead,
after a couple of minutes hanging, the driver just seems to start its
processing again! Here follows a logs stack example collected during stop.

16/01/12 19:17:32 INFO ApplicationMaster: Final app status: SUCCEEDED,
> exitCode: 0
> 16/01/12 19:17:32 INFO StreamingContext: Invoking
> stop(stopGracefully=true) from shutdown hook
> 16/01/12 19:17:32 INFO ReceiverTracker: Sent stop signal to all 3 receivers
> 16/01/12 19:17:32 ERROR ReceiverTracker: Deregistered receiver for stream
> 1: Stopped by driver
> 16/01/12 19:17:33 ERROR ReceiverTracker: Deregistered receiver for stream
> 2: Stopped by driver
> 16/01/12 19:17:33 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID
> 97) in 1200804 ms on ip-172-31-9-4.ec2.internal (1/1)
> 16/01/12 19:17:33 INFO DAGScheduler: ResultStage 8 (start at
> NetflowStreamingApp.scala:68) finished in 1200.806 s
> 16/01/12 19:17:33 INFO YarnClusterScheduler: Removed TaskSet 8.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:33 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Stopped by driver
> 16/01/12 19:17:34 INFO TaskSetManager: Finished task 0.0 in stage 12.0
> (TID 101) in 1199753 ms on ip-172-31-9-4.ec2.internal (1/1)
> 16/01/12 19:17:34 INFO YarnClusterScheduler: Removed TaskSet 12.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:34 INFO DAGScheduler: ResultStage 12 (start at
> NetflowStreamingApp.scala:68) finished in 1199.753 s
> 16/01/12 19:17:34 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID
> 96) in 1201854 ms on ip-172-31-9-5.ec2.internal (1/1)
> 16/01/12 19:17:34 INFO DAGScheduler: ResultStage 7 (start at
> NetflowStreamingApp.scala:68) finished in 1201.855 s
> 16/01/12 19:17:34 INFO YarnClusterScheduler: Removed TaskSet 7.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:34 INFO ReceiverTracker: Waiting for receiver job to
> terminate gracefully
> 16/01/12 19:17:34 INFO ReceiverTracker: Waited for receiver job to
> terminate gracefully
> 16/01/12 19:17:34 INFO ReceiverTracker: All of the receivers have
> deregistered successfully
> 16/01/12 19:17:34 INFO WriteAheadLogManager : Stopped write ahead log
> manager
> 16/01/12 19:17:34 INFO ReceiverTracker: ReceiverTracker stopped
> 16/01/12 19:17:34 INFO JobGenerator: Stopping JobGenerator gracefully
> 16/01/12 19:17:34 INFO JobGenerator: Waiting for all received blocks to be
> consumed for job generation


The "receivers" mentioned in the logs are the Kinesis streams receivers.

In my Scala 2.10 based driver, I just use
StreamingContext.awaitTerminationOrTimeout(timeout) API  (called right
after StreamingContext.start()) and set the
SparkConf spark.streaming.stopGracefullyOnShutdown=true.

Did anybody experience anything similar?

Any help would be appreciated.

Thanks,

Roberto


Spark Streaming - print accumulators value every period as logs

2015-12-24 Thread Roberto Coluccio
Hello,

I have a batch and a streaming driver using same functions (Scala). I use
accumulators (passed to functions constructors) to count stuff.

In the batch driver, doing so in the right point of the pipeline, I'm able
to retrieve the accumulator value and print it as log4j log.

In the streaming driver, doing the same results in just nothing. That's
probably due to the fact that accumulators in the streaming driver are
created empty and the code to print them is executed once at the driver
(when they are empty) when the StreamingContext is started and the DAG is
created.

I'm looking for a way to log at every batch period of my Spark Streaming
driver the current value of my accumulators. Indeed, I wish to reset such
accumulators at each period so to just have the counts related to that
period.

Any advice would be really appreciated.

Thanks,
Roberto


Re: Spark on EMR: out-of-the-box solution for real-time application logs monitoring?

2015-12-11 Thread Roberto Coluccio
Thanks for your advice, Steve.

I'm mainly talking about application logs. To be more clear, just for
instance think about the
"//hadoop/userlogs/application_blablabla/container_blablabla/stderr_or_stdout".
So YARN's applications containers logs, stored (at least for EMR's hadoop
2.4) on DataNodes and aggregated/pushed only once the application completes.

"yarn logs" issued from the cluster Master doesn't allow you to on-demand
aggregate logs for applications the are in running/active state.

For now I managed to install the awslogs agent (
http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/CWL_GettingStarted.html)
on
DataNodes so to push containers logs in real-time to CloudWatch logs, but
that's kinda of a workaround too, this is why I was wondering what the
community (in general, not only on EMR) uses to real-time monitor
application logs (in an automated fashion) for long-running processes like
streaming driver and if are there out-of-the-box solutions.

Thanks,

Roberto





On Thu, Dec 10, 2015 at 3:06 PM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> > On 10 Dec 2015, at 14:52, Roberto Coluccio <roberto.coluc...@gmail.com>
> wrote:
> >
> > Hello,
> >
> > I'm investigating on a solution to real-time monitor Spark logs produced
> by my EMR cluster in order to collect statistics and trigger alarms. Being
> on EMR, I found the CloudWatch Logs + Lambda pretty straightforward and,
> since I'm on AWS, those service are pretty well integrated together..but I
> could just find examples about it using on standalone EC2 instances.
> >
> > In my use case, EMR 3.9 and Spark 1.4.1 drivers running on YARN (cluster
> mode), I would like to be able to real-time monitor Spark logs, so not just
> about when the processing ends and they are copied to S3. Is there any
> out-of-the-box solution or best-practice for accomplish this goal when
> running on EMR that I'm not aware of?
> >
> > Spark logs are written on the Data Nodes (Core Instances) local file
> systems as YARN containers logs, so probably installing the awslogs agent
> on them and pointing to those logfiles would help pushing such logs on
> CloudWatch, but I was wondering how the community real-time monitors
> application logs when running Spark on YARN on EMR.
> >
> > Or maybe I'm looking at a wrong solution. Maybe the correct way would be
> using something like a CloudwatchSink so to make Spark (log4j) pushing logs
> directly to the sink and the sink pushing them to CloudWatch (I do like the
> out-of-the-box EMR logging experience and I want to keep the usual eventual
> logs archiving on S3 when the EMR cluster is terminated).
> >
> > Any ideas or experience about this problem?
> >
> > Thank you.
> >
> > Roberto
>
>
> are you talking about event logs as used by the history server, or
> application logs?
>
> the current spark log server writes events to a file, but as the hadoop s3
> fs client doesn't write except in close(), they won't be pushed out while
> thing are running. Someone (you?) could have a go at implementing a new
> event listener; some stuff that will come out in Spark 2.0 will make it
> easier to wire this up (SPARK-11314), which is coming as part of some work
> on spark-YARN timelineserver itnegration.
>
> In Hadoop 2.7.1 The log4j logs can be regularly captured by the Yarn
> Nodemanagers and automatically copied out, look at
> yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds . For
> that to work you need to set up your log wildcard patterns to for the NM to
> locate (i.e. have rolling logs with the right extensions)...the details
> escape me right now
>
> In earlier versions, you can use "yarn logs' to grab them and pull them
> down.
>
> I don't know anything about cloudwatch integration, sorry
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark on EMR: out-of-the-box solution for real-time application logs monitoring?

2015-12-10 Thread Roberto Coluccio
Hello,

I'm investigating on a solution to real-time monitor Spark logs produced by
my EMR cluster in order to collect statistics and trigger alarms. Being on
EMR, I found the CloudWatch Logs + Lambda pretty straightforward and, since
I'm on AWS, those service are pretty well integrated together..but I could
just find examples about it using on standalone EC2 instances.

In my use case, EMR 3.9 and Spark 1.4.1 drivers running on YARN (cluster
mode), I would like to be able to real-time monitor Spark logs, so not just
about when the processing ends and they are copied to S3. Is there any
out-of-the-box solution or best-practice for accomplish this goal when
running on EMR that I'm not aware of?

Spark logs are written on the Data Nodes (Core Instances) local file
systems as YARN containers logs, so probably installing the awslogs agent
on them and pointing to those logfiles would help pushing such logs on
CloudWatch, but I was wondering how the community real-time monitors
application logs when running Spark on YARN on EMR.

Or maybe I'm looking at a wrong solution. Maybe the correct way would be
using something like a CloudwatchSink so to make Spark (log4j) pushing logs
directly to the sink and the sink pushing them to CloudWatch (I do like the
out-of-the-box EMR logging experience and I want to keep the usual eventual
logs archiving on S3 when the EMR cluster is terminated).

Any ideas or experience about this problem?

Thank you.

Roberto


Fwd: [Spark + Hive + EMR + S3] Issue when reading from Hive external table backed on S3 with large amount of small files

2015-08-07 Thread Roberto Coluccio
Please community, I'd really appreciate your opinion on this topic.

Best regards,
Roberto


-- Forwarded message --
From: Roberto Coluccio roberto.coluc...@gmail.com
Date: Sat, Jul 25, 2015 at 6:28 PM
Subject: [Spark + Hive + EMR + S3] Issue when reading from Hive external
table backed on S3 with large amount of small files
To: user@spark.apache.org


Hello Spark community,

I currently have a Spark 1.3.1 batch driver, deployed in YARN-cluster mode
on an EMR cluster (AMI 3.7.0) that reads input data through an HiveContext,
in particular SELECTing data from an EXTERNAL TABLE backed on S3. Such
table has dynamic partitions and contains *hundreds of small GZip files*.
Considering at the moment unfeasible to collate such files on the source
side, I experience that, by default, the SELECT query is mapped by Spark
into as much tasks as many files are found in the table root
path(+partitions), e.g. 860 files === 860 tasks to complete the Spark stage
of that read operation.

This behaviour obviously creates an incredible overhead and, often, in
failed stages due to OOM exceptions and subsequent crashes of the
executors. Regardless the size of the input that I can manage to handle, I
would really appreciate if you could suggest how to collate somehow the
input partitions while reading, or, at least, reduce the number of tasks
spawned by the Hive query.

Looking at
http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-hive-differences.html#emr-hive-gzip-splits
I tried by setting:

hiveContext.sql(set hive.hadoop.supports.splittable.combineinputformat=true)


before creating the external table to read from and query it, but it
resulted in NO changes. Tried also to set that in the hive-site.xml on the
cluster, but I experienced the same behaviour.

Thanks to whomever will give me any hints.

Best regards,
Roberto


[Spark + Hive + EMR + S3] Issue when reading from Hive external table backed on S3 with large amount of small files

2015-07-25 Thread Roberto Coluccio
Hello Spark community,

I currently have a Spark 1.3.1 batch driver, deployed in YARN-cluster mode
on an EMR cluster (AMI 3.7.0) that reads input data through an HiveContext,
in particular SELECTing data from an EXTERNAL TABLE backed on S3. Such
table has dynamic partitions and contains *hundreds of small GZip files*.
Considering at the moment unfeasible to collate such files on the source
side, I experience that, by default, the SELECT query is mapped by Spark
into as much tasks as many files are found in the table root
path(+partitions), e.g. 860 files === 860 tasks to complete the Spark stage
of that read operation.

This behaviour obviously creates an incredible overhead and, often, in
failed stages due to OOM exceptions and subsequent crashes of the
executors. Regardless the size of the input that I can manage to handle, I
would really appreciate if you could suggest how to collate somehow the
input partitions while reading, or, at least, reduce the number of tasks
spawned by the Hive query.

Looking at
http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-hive-differences.html#emr-hive-gzip-splits
I tried by setting:

hiveContext.sql(set hive.hadoop.supports.splittable.combineinputformat=true)


before creating the external table to read from and query it, but it
resulted in NO changes. Tried also to set that in the hive-site.xml on the
cluster, but I experienced the same behaviour.

Thanks to whomever will give me any hints.

Best regards,
Roberto


Spark 1.3.1 + Hive: write output to CSV with header on S3

2015-07-17 Thread Roberto Coluccio
Hello community,

I'm currently using Spark 1.3.1 with Hive support for outputting processed
data on an external Hive table backed on S3. I'm using a manual
specification of the delimiter, but I'd want to know if is there any
clean way to write in CSV format:

*val* sparkConf = *new* SparkConf()

*val* sc = *new* SparkContext(sparkConf)

*val* hiveContext = *new* org.apache.spark.sql.hive.HiveContext(sc)

*import* hiveContext.implicits._

hiveContext.sql( CREATE EXTERNAL TABLE IF NOT EXISTS table_name(field1
STRING, field2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION ' + path_on_s3 + ')

hiveContext.sql(an INSERT OVERWRITE query to write into the above table)


I also need the header of the table to be printed on each written file. I
tried with:


hiveContext.sql(set hive.cli.print.header=true)


But it didn't work.


Any hint?


Thank you.


Best regards,

Roberto


Re: Spark 1.4 RDD to DF fails with toDF()

2015-06-26 Thread Roberto Coluccio
I got a similar issue. Might your as well be related to this
https://issues.apache.org/jira/browse/SPARK-8368 ?

On Fri, Jun 26, 2015 at 2:00 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Those provided spark libraries are compatible with scala 2.11?

 Thanks
 Best Regards

 On Fri, Jun 26, 2015 at 4:48 PM, Srikanth srikanth...@gmail.com wrote:

 Thanks Akhil for checking this out. Here is my build.sbt.

 name := Weblog Analysis

 version := 1.0

 scalaVersion := 2.11.5

 javacOptions ++= Seq(-source, 1.7, -target, 1.7)

 libraryDependencies ++= Seq(
   org.apache.spark %% spark-core % 1.4.0 % provided,
   org.apache.spark %% spark-sql % 1.4.0,
   org.apache.spark %% spark-streaming % 1.4.0,
   org.apache.spark %% spark-streaming-kafka % 1.4.0,
   org.apache.spark %% spark-mllib % 1.4.0,
   org.apache.commons % commons-lang3 % 3.0,
   org.eclipse.jetty  % jetty-client % 8.1.14.v20131031,
   org.scalatest %% scalatest % 2.2.1 % test,
   com.databricks % spark-csv_2.11 % 1.0.3,
   joda-time % joda-time % 2.8.1,
   org.joda  % joda-convert % 1.7
 )

 resolvers ++= Seq(
   Sonatype OSS Snapshots  at 
 http://oss.sonatype.org/content/repositories/snapshots/;,
   Sonatype public at 
 http://oss.sonatype.org/content/groups/public/;,
   Sonatype at 
 http://nexus.scala-tools.org/content/repositories/public;,
   Scala Tools at http://scala-tools.org/repo-snapshots/;,
   Typesafeat 
 http://repo.typesafe.com/typesafe/releases/;,
   Akka at http://akka.io/repository/;,
   JBoss   at 
 http://repository.jboss.org/nexus/content/groups/public/;,
   GuiceyFruit at http://guiceyfruit.googlecode.com/svn/repo/releases/;
 )

 On Fri, Jun 26, 2015 at 4:13 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Its a scala version conflict, can you paste your build.sbt file?

 Thanks
 Best Regards

 On Fri, Jun 26, 2015 at 7:05 AM, stati srikanth...@gmail.com wrote:

 Hello,

 When I run a spark job with spark-submit it fails with below exception
 for
 code line
/*val webLogDF = webLogRec.toDF().select(ip, date, name)*/

 I had similar issue running from spark-shell, then realized that I
 needed
 sqlContext.implicit._
 Now my code has the following imports
 /*
  import org.apache.spark._
  import org.apache.spark.sql._
  import org.apache.spark.sql.functions._
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._
 */

 Code works fine from spark-shell REPL. It also runs fine when run in
 local
 mode from Eclipse. I get this
 error only when I submit to cluster using spark-submit.
 bin/spark-submit /local/weblog-analysis_2.11-1.0.jar --class
 WebLogAnalysis
 --master spark://machu:7077

 I'm testing with spark 1.4. My code was built using scala 2.11 and
 spark+sparkSQL 1.4.0 as dependency in build.sbt

 Exception in thread main java.lang.NoSuchMethodError:

 scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror;
 at WebLogAnalysis$.readWebLogFiles(WebLogAnalysis.scala:38)
 at WebLogAnalysis$.main(WebLogAnalysis.scala:62)
 at WebLogAnalysis.main(WebLogAnalysis.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 I can provide more code or log if that will help. Let me know.

 Srikanth



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-RDD-to-DF-fails-with-toDF-tp23499.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: java.lang.OutOfMemoryError: PermGen space

2015-06-25 Thread Roberto Coluccio
Glad it worked!

Actually I got similar issues even with Spark Streaming v1.2.x based
drivers.

Think also that the default config in Spark on EMR is 512m !

Roberto


On Thu, Jun 25, 2015 at 1:20 AM, Srikanth srikanth...@gmail.com wrote:

 That worked. Thanks!

 I wonder what changed in 1.4 to cause this. It wouldn't work with anything
 less than 256m for a simple piece of code.
 1.3.1 used to work with default(64m I think)

 Srikanth

 On Wed, Jun 24, 2015 at 12:47 PM, Roberto Coluccio 
 roberto.coluc...@gmail.com wrote:

 Did you try to pass it with

 --driver-java-options -XX:MaxPermSize=256m

 as spark-shell input argument?


 Roberto


 On Wed, Jun 24, 2015 at 5:57 PM, stati srikanth...@gmail.com wrote:

 Hello,

 I moved from 1.3.1 to 1.4.0 and started receiving
 java.lang.OutOfMemoryError: PermGen space  when I use spark-shell.
 Same Scala code works fine in 1.3.1 spark-shell. I was loading same set
 of
 external JARs and have same imports in 1.3.1.

 I tried increasing perm size to 256m. I still got OOM.

 /SPARK_REPL_OPTS=-XX:MaxPermSize=256m bin/spark-shell  --master
 spark://machu:7077 --total-executor-cores 12  --packages
 com.databricks:spark-csv_2.10:1.0.3 --packages joda-time:joda-time:2.8.1
 /

 Spark UI Environment tab didn't show -XX:MaxPermSize. I'm not sure if
 this config was picked up.
 This is standalone mode.

 Any pointers to next step?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-PermGen-space-tp23472.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: java.lang.OutOfMemoryError: PermGen space

2015-06-24 Thread Roberto Coluccio
Did you try to pass it with

--driver-java-options -XX:MaxPermSize=256m

as spark-shell input argument?


Roberto


On Wed, Jun 24, 2015 at 5:57 PM, stati srikanth...@gmail.com wrote:

 Hello,

 I moved from 1.3.1 to 1.4.0 and started receiving
 java.lang.OutOfMemoryError: PermGen space  when I use spark-shell.
 Same Scala code works fine in 1.3.1 spark-shell. I was loading same set of
 external JARs and have same imports in 1.3.1.

 I tried increasing perm size to 256m. I still got OOM.

 /SPARK_REPL_OPTS=-XX:MaxPermSize=256m bin/spark-shell  --master
 spark://machu:7077 --total-executor-cores 12  --packages
 com.databricks:spark-csv_2.10:1.0.3 --packages joda-time:joda-time:2.8.1
 /

 Spark UI Environment tab didn't show -XX:MaxPermSize. I'm not sure if
 this config was picked up.
 This is standalone mode.

 Any pointers to next step?





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-PermGen-space-tp23472.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient

2015-06-20 Thread Roberto Coluccio
I confirm,

Christopher was very kind helping me out here. The solution presented in the 
linked doc worked perfectly. IMO it should be linked in the official Spark 
documentation.

Thanks again,

Roberto


 On 20 Jun 2015, at 19:25, Bozeman, Christopher bozem...@amazon.com wrote:
 
 We worked it out.  There was multiple items (like location of remote 
 metastore and db user auth) to make HiveContext happy in yarn-cluster mode. 
 
 For reference 
 https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/examples/using-hivecontext-yarn-cluster.md
 
 -Christopher Bozeman 
 
 
 On Jun 20, 2015, at 7:24 AM, Andrew Lee alee...@hotmail.com wrote:
 
 Hi Roberto,
 
 I'm not an EMR person, but it looks like option -h is deploying the 
 necessary dataneucleus JARs for you.
 The req for HiveContext is the hive-site.xml and dataneucleus JARs. As long 
 as these 2 are there, and Spark is compiled with -Phive, it should work.
 
 spark-shell runs in yarn-client mode. Not sure whether your other 
 application is running under the same mode or a different one. Try 
 specifying yarn-client mode and see if you get the same result as 
 spark-shell.
 
 From: roberto.coluc...@gmail.com
 Date: Wed, 10 Jun 2015 14:32:04 +0200
 Subject: [Spark 1.3.1 on YARN on EMR] Unable to instantiate 
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient
 To: user@spark.apache.org
 
 Hi!
 
 I'm struggling with an issue with Spark 1.3.1 running on YARN, running on an 
 AWS EMR cluster. Such cluster is based on AMI 3.7.0 (hence Amazon Linux 
 2015.03, Hive 0.13 already installed and configured on the cluster, Hadoop 
 2.4, etc...). I make use of the AWS emr-bootstrap-action install-spark 
 (https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark) with 
 the option/version -v1.3.1e so to get the latest Spark for EMR installed 
 and available.
 
 I also have a simple Spark Streaming driver in my project. Such driver is 
 part of a larger Maven project: in the pom.xml I'm currently using   
 
 [...]
 
 
 
 scala.binary.version2.10/scala.binary.version
 
 scala.version2.10.4/scala.version
 
 java.version1.7/java.version
 
 spark.version1.3.1/spark.version
 
 hadoop.version2.4.1/hadoop.version
 
 
 
 []
 
 
 dependency
 
   groupIdorg.apache.spark/groupId
 
   artifactIdspark-streaming_${scala.binary.version}/artifactId
 
   version${spark.version}/version
 
   scopeprovided/scope
 
   exclusions
 
 exclusion
 
   groupIdorg.apache.hadoop/groupId
 
   artifactIdhadoop-client/artifactId
 
 /exclusion
 
   /exclusions
 
 /dependency
 
 
 dependency
 
   groupIdorg.apache.hadoop/groupId
 
   artifactIdhadoop-client/artifactId
 
   version${hadoop.version}/version
 
   scopeprovided/scope
 
 /dependency
 
 
 dependency
 
   groupIdorg.apache.spark/groupId
 
   artifactIdspark-hive_${scala.binary.version}/artifactId
 
   version${spark.version}/version
 
   scopeprovided/scope
 
 /dependency
 
 
 
 
 
 In fact, at compile and build time everything works just fine if, in my 
 driver, I have:
 
 
 
 -
 
 
 
 val sparkConf = new SparkConf()
 
   .setAppName(appName)
 
   .set(spark.local.dir, /tmp/ + appName)
 
   .set(spark.streaming.unpersist, true)
 
   .set(spark.serializer, 
 org.apache.spark.serializer.KryoSerializer)
 
   .registerKryoClasses(Array(classOf[java.net.URI], classOf[String]))
 
 
 val sc = new SparkContext(sparkConf)
 
 
 val ssc = new StreamingContext(sc, config.batchDuration)
 import org.apache.spark.streaming.StreamingContext._
 
 ssc.checkpoint(sparkConf.get(spark.local.dir) + checkpointRelativeDir)
 
 
  some input reading actions 
 
 
  some input transformation actions 
 
 
 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
 
 import sqlContext.implicits._
 
 sqlContext.sql(an-HiveQL-query)
 
 
 ssc.start()
 
 ssc.awaitTerminationOrTimeout(config.timeout)
 
 
 
 --- 
 
 
 
 What happens is that, right after have been launched, the driver fails with 
 the exception:
 
 
 
 15/06/10 11:38:18 ERROR yarn.ApplicationMaster: User class threw exception: 
 java.lang.RuntimeException: Unable to instantiate 
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient
 java.lang.RuntimeException: java.lang.RuntimeException: Unable to 
 instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
 at 
 org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
 at 
 org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:239)
 at 
 org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:235)
 at 
 org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251)
 at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250)
 at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95)
 at  

[Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient

2015-06-10 Thread Roberto Coluccio
Hi!

I'm struggling with an issue with Spark 1.3.1 running on YARN, running on
an AWS EMR cluster. Such cluster is based on AMI 3.7.0 (hence Amazon Linux
2015.03, Hive 0.13 already installed and configured on the cluster, Hadoop
2.4, etc...). I make use of the AWS emr-bootstrap-action *install-spark* (
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark) with
the option/version* -v1.3.1e* so to get the latest Spark for EMR
installed and available.

I also have a simple Spark Streaming driver in my project. Such driver is
part of a larger Maven project: in the *pom.xml* I'm currently using

[...]


scala.binary.version2.10/scala.binary.version

scala.version2.10.4/scala.version

java.version1.7/java.version

spark.version1.3.1/spark.version

hadoop.version2.4.1/hadoop.version


[]

dependency

  groupIdorg.apache.spark/groupId

  artifactIdspark-streaming_${scala.binary.version}/artifactId

  version${spark.version}/version

  scopeprovided/scope

  exclusions

exclusion

  groupIdorg.apache.hadoop/groupId

  artifactIdhadoop-client/artifactId

/exclusion

  /exclusions

/dependency


dependency

  groupIdorg.apache.hadoop/groupId

  artifactIdhadoop-client/artifactId

  version${hadoop.version}/version

  scopeprovided/scope

/dependency


dependency

  groupIdorg.apache.spark/groupId

  artifactIdspark-hive_${scala.binary.version}/artifactId

  version${spark.version}/version

  scopeprovided/scope

/dependency



In fact, at compile and build time everything works just fine if, in my
driver, I have:


-


*val* sparkConf = *new* SparkConf()

  .setAppName(appName)

  .set(spark.local.dir, /tmp/ + appName)

  .set(spark.streaming.unpersist, true)

  .set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)

  .registerKryoClasses(Array(classOf[java.net.URI],
classOf[String]))


*val* sc = *new* SparkContext(sparkConf)


*val* ssc = *new* StreamingContext(sc, config.batchDuration)

*import* org.apache.spark.streaming.StreamingContext._

ssc.checkpoint(sparkConf.get(spark.local.dir) + checkpointRelativeDir)


 some input reading actions 


 some input transformation actions 


*val* sqlContext = *new* org.apache.spark.sql.hive.HiveContext(sc)

*import* sqlContext.implicits._

sqlContext.sql(an-HiveQL-query)


ssc.start()

ssc.awaitTerminationOrTimeout(config.timeout)



---


What happens is that, right after have been launched, the driver fails with
the exception:


15/06/10 11:38:18 ERROR yarn.ApplicationMaster: User class threw
exception: java.lang.RuntimeException: Unable to instantiate
org.apache.hadoop.hive.metastore.HiveMetaStoreClient
java.lang.RuntimeException: java.lang.RuntimeException: Unable to
instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346)
at 
org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:239)
at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:235)
at 
org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95)
at  myDriver.scala:  line of the sqlContext.sql(query) 
Caused by  some stuff 
Caused by: javax.jdo.JDOFatalUserException: Class
org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found.
NestedThrowables:
java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
...
Caused by: java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory


Thinking about a wrong Hive installation/configuration or libs/classpath
definition, I SSHed into the cluster and launched a *spark-shell.*
Excluding the app configuration and StreamingContext usage/definition, I
then carried out all the actions listed in the driver implementation, in
particular all the Hive-related ones and they all went through smoothly!


I also tried to use the optional *-h* argument (
https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/README.md#arguments-optional)
in the install-spark emr-bootstrap-action, but the driver failed the very
same way. Furthermore, when launching a spark-shell (on the EMR cluster
with Spark installed with the -h option), I also got:


15/06/09 14:20:51 WARN conf.HiveConf: hive-default.xml not found on CLASSPATH
15/06/09 14:20:52 INFO metastore.HiveMetaStore: 0: Opening raw store
with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/06/09 14:20:52 INFO metastore.ObjectStore: ObjectStore, initialize called
15/06/09 14:20:52 WARN DataNucleus.General: Plugin (Bundle)
org.datanucleus is already registered. Ensure you dont have multiple
JAR versions 

Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x

2015-03-18 Thread Roberto Coluccio
Hi everybody,

When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0
and 1.2.1) I encounter a weird error never occurred before about which I'd
kindly ask for any possible help.

 In particular, all my Spark SQL queries fail with the following exception:

java.lang.RuntimeException: [1.218] failure: identifier expected

 [my query listed]
   ^
   at scala.sys.package$.error(package.scala:27)
   at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
   at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
   at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
   at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
   at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
   at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
   at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
   ...



The unit tests I've got for testing this stuff fail both if I build+test
the project with Maven and if I run then as single ScalaTest files or test
suites/packages.

When running my app as usual on EMR in YARN-cluster mode, I get the
following:

15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status:
FAILED, exitCode: 15, (reason: User class threw exception: [1.218]
failure: identifier expected

SELECT * FROM ... (my query)



^)
Exception in thread Driver java.lang.RuntimeException: [1.218]
failure: identifier expected

SELECT * FROM ... (my query)



   ^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
at mycompany.mypackage.MyClassFunction.apply(MyClassFunction.scala:34)
at mycompany.mypackage.MyClass$.main(MyClass.scala:254)
at mycompany.mypackage.MyClass.main(MyClass.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:441)
15/03/17 11:32:14 INFO 

Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x

2015-03-18 Thread Roberto Coluccio
You know, I actually have one of the columns called timestamp ! This may
really cause the problem reported in the bug you linked, I guess.

On Wed, Mar 18, 2015 at 3:37 PM, Cheng Lian lian.cs@gmail.com wrote:

  I suspect that you hit this bug
 https://issues.apache.org/jira/browse/SPARK-6250, it depends on the
 actual contents of your query.

 Yin had opened a PR for this, although not merged yet, it should be a
 valid fix https://github.com/apache/spark/pull/5078

 This fix will be included in 1.3.1.

 Cheng

 On 3/18/15 10:04 PM, Roberto Coluccio wrote:

 Hi Cheng, thanks for your reply.

  The query is something like:

  SELECT * FROM (
   SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2),
 ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 =
 d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\
   UNION ALL
   SELECT ... [another SELECT statement with different conditions but same
 tables]
   UNION ALL
   SELECT ... [another SELECT statement with different conditions but same
 tables]
 ) a


  I'm using just sqlContext, no hiveContext. Please, note once again that
 this perfectly worked w/ Spark 1.1.x.

  The tables, i.e. tableD and tableM are previously registered with the
 RDD.registerTempTable method, where the input RDDs are actually a
 RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case
 classes with only (and less than 22) String fields.

  Hope the situation is a bit more clear. Thanks anyone who will help me
 out here.

  Roberto



 On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Would you mind to provide the query? If it's confidential, could you
 please help constructing a query that reproduces this issue?

 Cheng

 On 3/18/15 6:03 PM, Roberto Coluccio wrote:

 Hi everybody,

  When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both
 1.2.0 and 1.2.1) I encounter a weird error never occurred before about
 which I'd kindly ask for any possible help.

   In particular, all my Spark SQL queries fail with the following
 exception:

  java.lang.RuntimeException: [1.218] failure: identifier expected

 [my query listed]
   ^
   at scala.sys.package$.error(package.scala:27)
   at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
   at
 org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
   at
 org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
   at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
   at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
   at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
   at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
   ...



  The unit tests I've got for testing this stuff fail both if I
 build+test the project with Maven and if I run then as single ScalaTest
 files or test suites/packages.

  When running my app as usual on EMR in YARN-cluster mode, I get the
 following:

  15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, 
 exitCode: 15, (reason: User class threw exception: [1.218] failure: 
 identifier expected

 SELECT * FROM ... (my query)
  
  
^)
 Exception in thread Driver java.lang.RuntimeException: [1.218] failure: 
 identifier expected

 SELECT * FROM ... (my query) 
  
  
  ^
 at scala.sys.package$.error(package.scala:27)
 at 
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
 at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
 at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
 at 
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
 at 
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at 
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala

Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x

2015-03-18 Thread Roberto Coluccio
Hi Cheng, thanks for your reply.

The query is something like:

SELECT * FROM (
   SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ...,
 m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA
 WHERE m.column2!=\None\ AND d.columnA!=\\
   UNION ALL
   SELECT ... [another SELECT statement with different conditions but same
 tables]
   UNION ALL
   SELECT ... [another SELECT statement with different conditions but same
 tables]
 ) a


I'm using just sqlContext, no hiveContext. Please, note once again that
this perfectly worked w/ Spark 1.1.x.

The tables, i.e. tableD and tableM are previously registered with the
RDD.registerTempTable method, where the input RDDs are actually a
RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case
classes with only (and less than 22) String fields.

Hope the situation is a bit more clear. Thanks anyone who will help me out
here.

Roberto



On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com wrote:

  Would you mind to provide the query? If it's confidential, could you
 please help constructing a query that reproduces this issue?

 Cheng

 On 3/18/15 6:03 PM, Roberto Coluccio wrote:

 Hi everybody,

  When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0
 and 1.2.1) I encounter a weird error never occurred before about which I'd
 kindly ask for any possible help.

   In particular, all my Spark SQL queries fail with the following
 exception:

  java.lang.RuntimeException: [1.218] failure: identifier expected

 [my query listed]
   ^
   at scala.sys.package$.error(package.scala:27)
   at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
   at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
   at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
   at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
   at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
   at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
   at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
   at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
   ...



  The unit tests I've got for testing this stuff fail both if I build+test
 the project with Maven and if I run then as single ScalaTest files or test
 suites/packages.

  When running my app as usual on EMR in YARN-cluster mode, I get the
 following:

  15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, 
 exitCode: 15, (reason: User class threw exception: [1.218] failure: 
 identifier expected

 SELECT * FROM ... (my query)
   
   
  ^)
 Exception in thread Driver java.lang.RuntimeException: [1.218] failure: 
 identifier expected

 SELECT * FROM ... (my query)  
   
   
   ^
 at scala.sys.package$.error(package.scala:27)
 at 
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
 at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
 at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
 at 
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
 at 
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
 at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
 at 
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at 
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
 at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
 at 
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at 
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
 at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
 at 
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254

Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x

2015-03-18 Thread Roberto Coluccio
Hey Cheng, thank you so much for your suggestion, the problem was actually
a column/field called timestamp in one of the case classes!! Once I
changed its name everything worked out fine again. Let me say it was kinda
frustrating ...

Roberto

On Wed, Mar 18, 2015 at 4:07 PM, Roberto Coluccio 
roberto.coluc...@gmail.com wrote:

 You know, I actually have one of the columns called timestamp ! This may
 really cause the problem reported in the bug you linked, I guess.

 On Wed, Mar 18, 2015 at 3:37 PM, Cheng Lian lian.cs@gmail.com wrote:

  I suspect that you hit this bug
 https://issues.apache.org/jira/browse/SPARK-6250, it depends on the
 actual contents of your query.

 Yin had opened a PR for this, although not merged yet, it should be a
 valid fix https://github.com/apache/spark/pull/5078

 This fix will be included in 1.3.1.

 Cheng

 On 3/18/15 10:04 PM, Roberto Coluccio wrote:

 Hi Cheng, thanks for your reply.

  The query is something like:

  SELECT * FROM (
   SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2),
 ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 =
 d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\
   UNION ALL
   SELECT ... [another SELECT statement with different conditions but
 same tables]
   UNION ALL
   SELECT ... [another SELECT statement with different conditions but
 same tables]
 ) a


  I'm using just sqlContext, no hiveContext. Please, note once again that
 this perfectly worked w/ Spark 1.1.x.

  The tables, i.e. tableD and tableM are previously registered with the
 RDD.registerTempTable method, where the input RDDs are actually a
 RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple
 case classes with only (and less than 22) String fields.

  Hope the situation is a bit more clear. Thanks anyone who will help me
 out here.

  Roberto



 On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com
 wrote:

  Would you mind to provide the query? If it's confidential, could you
 please help constructing a query that reproduces this issue?

 Cheng

 On 3/18/15 6:03 PM, Roberto Coluccio wrote:

 Hi everybody,

  When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both
 1.2.0 and 1.2.1) I encounter a weird error never occurred before about
 which I'd kindly ask for any possible help.

   In particular, all my Spark SQL queries fail with the following
 exception:

  java.lang.RuntimeException: [1.218] failure: identifier expected

 [my query listed]
   ^
   at scala.sys.package$.error(package.scala:27)
   at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
   at
 org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
   at
 org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
   at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
   at
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
   at
 scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
   at
 scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
   at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
   at
 scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
   ...



  The unit tests I've got for testing this stuff fail both if I
 build+test the project with Maven and if I run then as single ScalaTest
 files or test suites/packages.

  When running my app as usual on EMR in YARN-cluster mode, I get the
 following:

  15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, 
 exitCode: 15, (reason: User class threw exception: [1.218] failure: 
 identifier expected

 SELECT * FROM ... (my query)
 
 
  ^)
 Exception in thread Driver java.lang.RuntimeException: [1.218] failure: 
 identifier expected

 SELECT * FROM ... (my query)
 
 
 ^
 at scala.sys.package$.error(package.scala:27)
 at 
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
 at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
 at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
 at 
 org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
 at 
 org.apache.spark.sql.catalyst.SparkSQLParser

Spark UI port issue when deploying Spark driver on YARN in yarn-cluster mode on EMR

2014-12-23 Thread Roberto Coluccio
Hello folks,

I'm trying to deploy a Spark driver on Amazon EMR in yarn-cluster mode
expecting to be able to access the Spark UI from the spark-master-ip:4040
address (default port). The problem here is that the Spark UI port is
always defined randomly at runtime, although I also tried to specify it in
the spark-defaults.conf file: in order to do so, I used this:
https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark#3-utilize-an-emr-step-to-configure-the-spark-default-configuration-optional
, setting the spark.ui.port to a static known value. No luck, every time I
launch (using the spark-submit script from the yarn-master node) a Spark
driver, the UI port is chose randomly.

Is there any configurations I'm missing out here?

Thank you very much.

Roberto


Access resources from jar-local resources folder

2014-09-23 Thread Roberto Coluccio
Hello folks,

I have a Spark Streaming application built with Maven (as jar) and deployed
with the spark-submit script. The application project has the following
(main) structure:

myApp

src

main

scala

com.mycompany.package

MyApp.scala
DoSomething.scala
...

resources

aPerlScript.pl
...

test

scala

com.mycompany.package

MyAppTest.scala

...

target

...

pom.xml


In the DoSomething.scala object I have a method (let's call it
doSomething()) that tries to run a perl script as an external
scala.sys.process.Process , taken from the resources folder. I call then
DoSomething.doSomething(). Ok, here's the *issue*: I was not able to access
such script, not with absolute paths, relative paths,
getClass.getClassLoader.getResource, getClass.getResource, I have specified
the resources folder in my pom.xml...None of my attempts succeeded: I don't
know how to find the stuff I put in src/main/resources.

I will appreciate any help.

SIDE NOTES:

   - I use an external Process instead of a Spark  pipe because, at this
   step of my workflow, I must handle binary files as input and output.
   - I'm using spark-streaming 1.1.0, Scala 2.10.4 and Java 7. I build the
   jar with Maven install from within Eclipse (Kepler)
   - When I use the getClass.getClassLoader.getResource standard method
   to access resources I find that the actual classpath is the spark-submit
   script's one.
   -


Thank you and best regards,
Roberto