Re: Dynamic metric names
It would be a dream to have an easy-to-use dynamic metric system AND a reliable counting system (accumulator-like) in Spark... Thanks Roberto On Tue, May 7, 2019 at 3:54 AM Saisai Shao wrote: > I think the main reason why that was not merged is that Spark itself > doesn't have such requirement, and the metrics system is mainly used for > spark itself. Most of the needs are from the custom sources/sinks, but > Spark's MetricsSystem is not designed as a public API. > > I think we could revisit or improve that PR if there's a solid reason > about it. > > Thanks > Saisai > > Sergey Zhemzhitsky 于2019年5月7日周二 上午5:49写道: > >> Hi Saisai, >> >> Thanks a lot for the link! This is exactly what I need. >> Just curious, why this PR has not been merged, as it seems to implement >> rather natural requirement. >> >> There are a number or use cases which can benefit from this feature, e.g. >> - collecting business metrics based on the data's attributes and >> reporting them into the monitoring system as a side effect of the data >> processing >> - visualizing technical metrics by means of alternative software (e.g. >> grafana) - currently it's hardly possible to know the actual number of >> jobs, stages, tasks and their names and IDs in advance to register all the >> corresponding metrics statically. >> >> >> Kind Regards, >> Sergey >> >> >> On Mon, May 6, 2019, 16:07 Saisai Shao wrote: >> >>> I remembered there was a PR about doing similar thing ( >>> https://github.com/apache/spark/pull/18406). From my understanding, >>> this seems like a quite specific requirement, it may requires code change >>> to support your needs. >>> >>> Thanks >>> Saisai >>> >>> Sergey Zhemzhitsky 于2019年5月4日周六 下午4:44写道: >>> Hello Spark Users! Just wondering whether it is possible to register a metric source without metrics known in advance and add the metrics themselves to this source later on? It seems that currently MetricSystem puts all the metrics from the source's MetricRegistry into a shared MetricRegistry of a MetricSystem during metric source registration [1]. So in case there is a new metric with a new name added to the source's registry after this source registration, then this new metric will not be reported to the sinks. What I'd like to achieve is to be able to register new metrics with new names dynamically using a single metric source. Is it somehow possible? [1] https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162 >>>
userClassPathFirst=true prevents SparkContext to be initialized
Hello folks, I'm trying to work around an issue with some dependencies by trying to specify at spark-submit time that I want my (user) classpath to be resolved and taken into account first (against the jars received through the System Classpath, which is /data/cloudera/parcels/CDH/jars/). In order to accomplish this, I specify --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true and I pass my jars with --jars in my spark-submit command, deploying in yarn cluster mode in a CDH 5.8 environment (Spark 1.6). In the list passed with --jars I have severals deps, NOT including hadoop/spark related ones. My app jar is not a fat (uber) one, thus it includes only business classes. None of these ones has for any reasons a "SparkConf.set("master", "local")", or anything like that. Without specifying the userClassPathFirst configuration, my App is launched and completed with no issues at all. I tried to print logs down to the TRACE level with no luck. I get no explicit errors and I verified adding the "-verbose:class" JVM arg that Spark-related classes seem to be loaded with no issues. From a rapid overview of loaded classes, it seems to me that a small fraction of classes is loaded using userClassPathFirst=true w/r/t the default case. Eventually, my driver's stderr gets stuck in logging out: 2017-01-30 10:10:22,308 INFO ApplicationMaster:58 - Waiting for spark context initialization ... 2017-01-30 10:10:32,310 INFO ApplicationMaster:58 - Waiting for spark context initialization ... 2017-01-30 10:10:42,311 INFO ApplicationMaster:58 - Waiting for spark context initialization ... Dramatically, the application is then killed by YARN after a timeout. In my understanding, quoting the doc ( http://spark.apache.org/docs/1.6.2/configuration.html): [image: Inline image 1] So I would expect the libs given through --jars options to be used first, but I also expect no issues in loading the system classpath afterwards. This is confirmed by the logs printed with the "-verbose:class" JVM option, where I can see logs like: [Loaded org.apache.spark.SparkContext from file:/data/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/jars/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar] What am I missing here guys? Thanks for your help. Best regards, Roberto
Re: [Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers
Any chance anyone gave a look at this? Thanks! On Wed, Feb 10, 2016 at 10:46 AM, Roberto Coluccio < roberto.coluc...@gmail.com> wrote: > Thanks Shixiong! > > I'm attaching the thread dumps (I printed the Spark UI after expanding all > the elements, hope that's fine) and related stderr (INFO level) executors > logs. There are 3 of them. Thread dumps have been collected at the time the > StreamingContext was (trying to) shutdown, i.e. when I saw the following > logs in driver's stderr: > > 16/02/10 15:46:25 INFO ApplicationMaster: Final app status: SUCCEEDED, > exitCode: 0 > 16/02/10 15:46:25 INFO StreamingContext: Invoking stop(stopGracefully=true) > from shutdown hook > 16/02/10 15:46:25 INFO ReceiverTracker: Sent stop signal to all 3 receivers > 16/02/10 15:46:35 INFO ReceiverTracker: Waiting for receiver job to terminate > gracefully > > > Then, from 15:50 ongoing, the driver started again to report logs as it > was continuing to process as usual. You might find some exceptions in > executors logs that have right the 15:50 timestamp. > > Thanks you very much in advance! > > Roberto > > > > On Tue, Feb 9, 2016 at 6:25 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Could you do a thread dump in the executor that runs the Kinesis receiver >> and post it? It would be great if you can provide the executor log as well? >> >> On Tue, Feb 9, 2016 at 3:14 PM, Roberto Coluccio < >> roberto.coluc...@gmail.com> wrote: >> >>> Hello, >>> >>> can anybody kindly help me out a little bit here? I just verified the >>> problem is still there on Spark 1.6.0 and emr-4.3.0 as well. It's >>> definitely a Kinesis-related issue, since with Spark 1.6.0 I'm successfully >>> able to get Streaming drivers to terminate with no issue IF I don't use >>> Kinesis and open any Receivers. >>> >>> Thank you! >>> >>> Roberto >>> >>> >>> On Tue, Feb 2, 2016 at 4:40 PM, Roberto Coluccio < >>> roberto.coluc...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I'm struggling around an issue ever since I tried to upgrade my Spark >>>> Streaming solution from 1.4.1 to 1.5+. >>>> >>>> I have a Spark Streaming app which creates 3 ReceiverInputDStreams >>>> leveraging KinesisUtils.createStream API. >>>> >>>> I used to leverage a timeout to terminate my app >>>> (StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf >>>> spark.streaming.stopGracefullyOnShutdown=true). >>>> >>>> I used to submit my Spark app on EMR in yarn-cluster mode. >>>> >>>> Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9). >>>> >>>> Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0 >>>> on emr-4.3.0) I can't get the app to actually terminate. Logs tells me it >>>> tries to, but no confirmation of receivers stop is retrieved. Instead, when >>>> the timer gets to the next period, the StreamingContext continues its >>>> processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem >>>> and pmem killls disabled). >>>> >>>> ... >>>> >>>> 16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED, >>>> exitCode: 0 >>>> 16/02/02 21:22:08 INFO StreamingContext: Invoking >>>> stop(stopGracefully=true) from shutdown hook >>>> 16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers >>>> 16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to >>>> terminate gracefully >>>> 16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141 >>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on >>>> 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB) >>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on >>>> ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free: 1224.9 >>>> MB) >>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on >>>> ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free: 1224.0 >>>> MB) >>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on >>>> ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free: 1224.9 >>>> MB) >>>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piec
Re: [Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers
Hello, can anybody kindly help me out a little bit here? I just verified the problem is still there on Spark 1.6.0 and emr-4.3.0 as well. It's definitely a Kinesis-related issue, since with Spark 1.6.0 I'm successfully able to get Streaming drivers to terminate with no issue IF I don't use Kinesis and open any Receivers. Thank you! Roberto On Tue, Feb 2, 2016 at 4:40 PM, Roberto Coluccio <roberto.coluc...@gmail.com > wrote: > Hi, > > I'm struggling around an issue ever since I tried to upgrade my Spark > Streaming solution from 1.4.1 to 1.5+. > > I have a Spark Streaming app which creates 3 ReceiverInputDStreams > leveraging KinesisUtils.createStream API. > > I used to leverage a timeout to terminate my app > (StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf > spark.streaming.stopGracefullyOnShutdown=true). > > I used to submit my Spark app on EMR in yarn-cluster mode. > > Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9). > > Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0 on > emr-4.3.0) I can't get the app to actually terminate. Logs tells me it > tries to, but no confirmation of receivers stop is retrieved. Instead, when > the timer gets to the next period, the StreamingContext continues its > processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem > and pmem killls disabled). > > ... > > 16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED, > exitCode: 0 > 16/02/02 21:22:08 INFO StreamingContext: Invoking stop(stopGracefully=true) > from shutdown hook > 16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers > 16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to terminate > gracefully > 16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141 > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on > 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB) > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on > ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free: 1224.9 MB) > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on > ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free: 1224.0 MB) > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on > ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free: 1224.9 MB) > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on > ip-172-31-3-140.ec2.internal:50542 in memory (size: 23.9 KB, free: 1224.7 MB) > 16/02/02 21:22:52 INFO ContextCleaner: Cleaned accumulator 184 > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on > 172.31.3.140:50152 in memory (size: 3.0 KB, free: 2.1 GB) > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on > ip-172-31-3-141.ec2.internal:41776 in memory (size: 3.0 KB, free: 1224.9 MB) > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on > ip-172-31-3-141.ec2.internal:56428 in memory (size: 3.0 KB, free: 1224.9 MB) > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on > ip-172-31-3-140.ec2.internal:36295 in memory (size: 3.0 KB, free: 1224.0 MB) > 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on > ip-172-31-3-140.ec2.internal:50542 in memory (size: 3.0 KB, free: 1224.7 MB) > 16/02/02 21:25:00 INFO StateDStream: Marking RDD 680 for time 145444830 > ms for checkpointing > 16/02/02 21:25:00 INFO StateDStream: Marking RDD 708 for time 145444830 > ms for checkpointing > 16/02/02 21:25:00 INFO TransformedDStream: Slicing from 145444800 ms to > 145444830 ms (aligned to 145444800 ms and 145444830 ms) > 16/02/02 21:25:00 INFO StateDStream: Marking RDD 777 for time 145444830 > ms for checkpointing > 16/02/02 21:25:00 INFO StateDStream: Marking RDD 801 for time 145444830 > ms for checkpointing > 16/02/02 21:25:00 INFO JobScheduler: Added jobs for time 145444830 ms > 16/02/02 21:25:00 INFO JobGenerator: Checkpointing graph for time > 145444830 ms > 16/02/02 21:25:00 INFO DStreamGraph: Updating checkpoint data for time > 145444830 ms > 16/02/02 21:25:00 INFO JobScheduler: Starting job streaming job 145444830 > ms.0 from job set of time 145444830 ms > > ... > > > Please, this is really blocking in the upgrade process to latest Spark > versions and I really don't know how to work it around. > > Any help would be very much appreciated. > > Thank you, > > Roberto > > >
[Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers
Hi, I'm struggling around an issue ever since I tried to upgrade my Spark Streaming solution from 1.4.1 to 1.5+. I have a Spark Streaming app which creates 3 ReceiverInputDStreams leveraging KinesisUtils.createStream API. I used to leverage a timeout to terminate my app (StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf spark.streaming.stopGracefullyOnShutdown=true). I used to submit my Spark app on EMR in yarn-cluster mode. Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9). Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0 on emr-4.3.0) I can't get the app to actually terminate. Logs tells me it tries to, but no confirmation of receivers stop is retrieved. Instead, when the timer gets to the next period, the StreamingContext continues its processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem and pmem killls disabled). ... 16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0 16/02/02 21:22:08 INFO StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook 16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers 16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to terminate gracefully 16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB) 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free: 1224.9 MB) 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free: 1224.0 MB) 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free: 1224.9 MB) 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on ip-172-31-3-140.ec2.internal:50542 in memory (size: 23.9 KB, free: 1224.7 MB) 16/02/02 21:22:52 INFO ContextCleaner: Cleaned accumulator 184 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 172.31.3.140:50152 in memory (size: 3.0 KB, free: 2.1 GB) 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on ip-172-31-3-141.ec2.internal:41776 in memory (size: 3.0 KB, free: 1224.9 MB) 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on ip-172-31-3-141.ec2.internal:56428 in memory (size: 3.0 KB, free: 1224.9 MB) 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on ip-172-31-3-140.ec2.internal:36295 in memory (size: 3.0 KB, free: 1224.0 MB) 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on ip-172-31-3-140.ec2.internal:50542 in memory (size: 3.0 KB, free: 1224.7 MB) 16/02/02 21:25:00 INFO StateDStream: Marking RDD 680 for time 145444830 ms for checkpointing 16/02/02 21:25:00 INFO StateDStream: Marking RDD 708 for time 145444830 ms for checkpointing 16/02/02 21:25:00 INFO TransformedDStream: Slicing from 145444800 ms to 145444830 ms (aligned to 145444800 ms and 145444830 ms) 16/02/02 21:25:00 INFO StateDStream: Marking RDD 777 for time 145444830 ms for checkpointing 16/02/02 21:25:00 INFO StateDStream: Marking RDD 801 for time 145444830 ms for checkpointing 16/02/02 21:25:00 INFO JobScheduler: Added jobs for time 145444830 ms 16/02/02 21:25:00 INFO JobGenerator: Checkpointing graph for time 145444830 ms 16/02/02 21:25:00 INFO DStreamGraph: Updating checkpoint data for time 145444830 ms 16/02/02 21:25:00 INFO JobScheduler: Starting job streaming job 145444830 ms.0 from job set of time 145444830 ms ... Please, this is really blocking in the upgrade process to latest Spark versions and I really don't know how to work it around. Any help would be very much appreciated. Thank you, Roberto
Spark 1.5.2 streaming driver in YARN cluster mode on Hadoop 2.6 (on EMR 4.2) restarts after stop
Hi there, I'm facing a weird issue when upgrading from Spark 1.4.1 streaming driver on EMR 3.9 (hence Hadoop 2.4.0) to Spark 1.5.2 on EMR 4.2 (hence Hadoop 2.6.0). Basically, the very same driver which used to terminate after a timeout as expected, now does not. In particular, as long as the driver's logs could tell me, the StreamingContext seems to be stopped with success (and exit code 0), but the Hadoop/YARN job does not terminate/complete. Instead, after a couple of minutes hanging, the driver just seems to start its processing again! Here follows a logs stack example collected during stop. 16/01/12 19:17:32 INFO ApplicationMaster: Final app status: SUCCEEDED, > exitCode: 0 > 16/01/12 19:17:32 INFO StreamingContext: Invoking > stop(stopGracefully=true) from shutdown hook > 16/01/12 19:17:32 INFO ReceiverTracker: Sent stop signal to all 3 receivers > 16/01/12 19:17:32 ERROR ReceiverTracker: Deregistered receiver for stream > 1: Stopped by driver > 16/01/12 19:17:33 ERROR ReceiverTracker: Deregistered receiver for stream > 2: Stopped by driver > 16/01/12 19:17:33 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID > 97) in 1200804 ms on ip-172-31-9-4.ec2.internal (1/1) > 16/01/12 19:17:33 INFO DAGScheduler: ResultStage 8 (start at > NetflowStreamingApp.scala:68) finished in 1200.806 s > 16/01/12 19:17:33 INFO YarnClusterScheduler: Removed TaskSet 8.0, whose > tasks have all completed, from pool > 16/01/12 19:17:33 ERROR ReceiverTracker: Deregistered receiver for stream > 0: Stopped by driver > 16/01/12 19:17:34 INFO TaskSetManager: Finished task 0.0 in stage 12.0 > (TID 101) in 1199753 ms on ip-172-31-9-4.ec2.internal (1/1) > 16/01/12 19:17:34 INFO YarnClusterScheduler: Removed TaskSet 12.0, whose > tasks have all completed, from pool > 16/01/12 19:17:34 INFO DAGScheduler: ResultStage 12 (start at > NetflowStreamingApp.scala:68) finished in 1199.753 s > 16/01/12 19:17:34 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID > 96) in 1201854 ms on ip-172-31-9-5.ec2.internal (1/1) > 16/01/12 19:17:34 INFO DAGScheduler: ResultStage 7 (start at > NetflowStreamingApp.scala:68) finished in 1201.855 s > 16/01/12 19:17:34 INFO YarnClusterScheduler: Removed TaskSet 7.0, whose > tasks have all completed, from pool > 16/01/12 19:17:34 INFO ReceiverTracker: Waiting for receiver job to > terminate gracefully > 16/01/12 19:17:34 INFO ReceiverTracker: Waited for receiver job to > terminate gracefully > 16/01/12 19:17:34 INFO ReceiverTracker: All of the receivers have > deregistered successfully > 16/01/12 19:17:34 INFO WriteAheadLogManager : Stopped write ahead log > manager > 16/01/12 19:17:34 INFO ReceiverTracker: ReceiverTracker stopped > 16/01/12 19:17:34 INFO JobGenerator: Stopping JobGenerator gracefully > 16/01/12 19:17:34 INFO JobGenerator: Waiting for all received blocks to be > consumed for job generation The "receivers" mentioned in the logs are the Kinesis streams receivers. In my Scala 2.10 based driver, I just use StreamingContext.awaitTerminationOrTimeout(timeout) API (called right after StreamingContext.start()) and set the SparkConf spark.streaming.stopGracefullyOnShutdown=true. Did anybody experience anything similar? Any help would be appreciated. Thanks, Roberto
Spark Streaming - print accumulators value every period as logs
Hello, I have a batch and a streaming driver using same functions (Scala). I use accumulators (passed to functions constructors) to count stuff. In the batch driver, doing so in the right point of the pipeline, I'm able to retrieve the accumulator value and print it as log4j log. In the streaming driver, doing the same results in just nothing. That's probably due to the fact that accumulators in the streaming driver are created empty and the code to print them is executed once at the driver (when they are empty) when the StreamingContext is started and the DAG is created. I'm looking for a way to log at every batch period of my Spark Streaming driver the current value of my accumulators. Indeed, I wish to reset such accumulators at each period so to just have the counts related to that period. Any advice would be really appreciated. Thanks, Roberto
Re: Spark on EMR: out-of-the-box solution for real-time application logs monitoring?
Thanks for your advice, Steve. I'm mainly talking about application logs. To be more clear, just for instance think about the "//hadoop/userlogs/application_blablabla/container_blablabla/stderr_or_stdout". So YARN's applications containers logs, stored (at least for EMR's hadoop 2.4) on DataNodes and aggregated/pushed only once the application completes. "yarn logs" issued from the cluster Master doesn't allow you to on-demand aggregate logs for applications the are in running/active state. For now I managed to install the awslogs agent ( http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/CWL_GettingStarted.html) on DataNodes so to push containers logs in real-time to CloudWatch logs, but that's kinda of a workaround too, this is why I was wondering what the community (in general, not only on EMR) uses to real-time monitor application logs (in an automated fashion) for long-running processes like streaming driver and if are there out-of-the-box solutions. Thanks, Roberto On Thu, Dec 10, 2015 at 3:06 PM, Steve Loughran <ste...@hortonworks.com> wrote: > > > On 10 Dec 2015, at 14:52, Roberto Coluccio <roberto.coluc...@gmail.com> > wrote: > > > > Hello, > > > > I'm investigating on a solution to real-time monitor Spark logs produced > by my EMR cluster in order to collect statistics and trigger alarms. Being > on EMR, I found the CloudWatch Logs + Lambda pretty straightforward and, > since I'm on AWS, those service are pretty well integrated together..but I > could just find examples about it using on standalone EC2 instances. > > > > In my use case, EMR 3.9 and Spark 1.4.1 drivers running on YARN (cluster > mode), I would like to be able to real-time monitor Spark logs, so not just > about when the processing ends and they are copied to S3. Is there any > out-of-the-box solution or best-practice for accomplish this goal when > running on EMR that I'm not aware of? > > > > Spark logs are written on the Data Nodes (Core Instances) local file > systems as YARN containers logs, so probably installing the awslogs agent > on them and pointing to those logfiles would help pushing such logs on > CloudWatch, but I was wondering how the community real-time monitors > application logs when running Spark on YARN on EMR. > > > > Or maybe I'm looking at a wrong solution. Maybe the correct way would be > using something like a CloudwatchSink so to make Spark (log4j) pushing logs > directly to the sink and the sink pushing them to CloudWatch (I do like the > out-of-the-box EMR logging experience and I want to keep the usual eventual > logs archiving on S3 when the EMR cluster is terminated). > > > > Any ideas or experience about this problem? > > > > Thank you. > > > > Roberto > > > are you talking about event logs as used by the history server, or > application logs? > > the current spark log server writes events to a file, but as the hadoop s3 > fs client doesn't write except in close(), they won't be pushed out while > thing are running. Someone (you?) could have a go at implementing a new > event listener; some stuff that will come out in Spark 2.0 will make it > easier to wire this up (SPARK-11314), which is coming as part of some work > on spark-YARN timelineserver itnegration. > > In Hadoop 2.7.1 The log4j logs can be regularly captured by the Yarn > Nodemanagers and automatically copied out, look at > yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds . For > that to work you need to set up your log wildcard patterns to for the NM to > locate (i.e. have rolling logs with the right extensions)...the details > escape me right now > > In earlier versions, you can use "yarn logs' to grab them and pull them > down. > > I don't know anything about cloudwatch integration, sorry > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Spark on EMR: out-of-the-box solution for real-time application logs monitoring?
Hello, I'm investigating on a solution to real-time monitor Spark logs produced by my EMR cluster in order to collect statistics and trigger alarms. Being on EMR, I found the CloudWatch Logs + Lambda pretty straightforward and, since I'm on AWS, those service are pretty well integrated together..but I could just find examples about it using on standalone EC2 instances. In my use case, EMR 3.9 and Spark 1.4.1 drivers running on YARN (cluster mode), I would like to be able to real-time monitor Spark logs, so not just about when the processing ends and they are copied to S3. Is there any out-of-the-box solution or best-practice for accomplish this goal when running on EMR that I'm not aware of? Spark logs are written on the Data Nodes (Core Instances) local file systems as YARN containers logs, so probably installing the awslogs agent on them and pointing to those logfiles would help pushing such logs on CloudWatch, but I was wondering how the community real-time monitors application logs when running Spark on YARN on EMR. Or maybe I'm looking at a wrong solution. Maybe the correct way would be using something like a CloudwatchSink so to make Spark (log4j) pushing logs directly to the sink and the sink pushing them to CloudWatch (I do like the out-of-the-box EMR logging experience and I want to keep the usual eventual logs archiving on S3 when the EMR cluster is terminated). Any ideas or experience about this problem? Thank you. Roberto
Fwd: [Spark + Hive + EMR + S3] Issue when reading from Hive external table backed on S3 with large amount of small files
Please community, I'd really appreciate your opinion on this topic. Best regards, Roberto -- Forwarded message -- From: Roberto Coluccio roberto.coluc...@gmail.com Date: Sat, Jul 25, 2015 at 6:28 PM Subject: [Spark + Hive + EMR + S3] Issue when reading from Hive external table backed on S3 with large amount of small files To: user@spark.apache.org Hello Spark community, I currently have a Spark 1.3.1 batch driver, deployed in YARN-cluster mode on an EMR cluster (AMI 3.7.0) that reads input data through an HiveContext, in particular SELECTing data from an EXTERNAL TABLE backed on S3. Such table has dynamic partitions and contains *hundreds of small GZip files*. Considering at the moment unfeasible to collate such files on the source side, I experience that, by default, the SELECT query is mapped by Spark into as much tasks as many files are found in the table root path(+partitions), e.g. 860 files === 860 tasks to complete the Spark stage of that read operation. This behaviour obviously creates an incredible overhead and, often, in failed stages due to OOM exceptions and subsequent crashes of the executors. Regardless the size of the input that I can manage to handle, I would really appreciate if you could suggest how to collate somehow the input partitions while reading, or, at least, reduce the number of tasks spawned by the Hive query. Looking at http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-hive-differences.html#emr-hive-gzip-splits I tried by setting: hiveContext.sql(set hive.hadoop.supports.splittable.combineinputformat=true) before creating the external table to read from and query it, but it resulted in NO changes. Tried also to set that in the hive-site.xml on the cluster, but I experienced the same behaviour. Thanks to whomever will give me any hints. Best regards, Roberto
[Spark + Hive + EMR + S3] Issue when reading from Hive external table backed on S3 with large amount of small files
Hello Spark community, I currently have a Spark 1.3.1 batch driver, deployed in YARN-cluster mode on an EMR cluster (AMI 3.7.0) that reads input data through an HiveContext, in particular SELECTing data from an EXTERNAL TABLE backed on S3. Such table has dynamic partitions and contains *hundreds of small GZip files*. Considering at the moment unfeasible to collate such files on the source side, I experience that, by default, the SELECT query is mapped by Spark into as much tasks as many files are found in the table root path(+partitions), e.g. 860 files === 860 tasks to complete the Spark stage of that read operation. This behaviour obviously creates an incredible overhead and, often, in failed stages due to OOM exceptions and subsequent crashes of the executors. Regardless the size of the input that I can manage to handle, I would really appreciate if you could suggest how to collate somehow the input partitions while reading, or, at least, reduce the number of tasks spawned by the Hive query. Looking at http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-hive-differences.html#emr-hive-gzip-splits I tried by setting: hiveContext.sql(set hive.hadoop.supports.splittable.combineinputformat=true) before creating the external table to read from and query it, but it resulted in NO changes. Tried also to set that in the hive-site.xml on the cluster, but I experienced the same behaviour. Thanks to whomever will give me any hints. Best regards, Roberto
Spark 1.3.1 + Hive: write output to CSV with header on S3
Hello community, I'm currently using Spark 1.3.1 with Hive support for outputting processed data on an external Hive table backed on S3. I'm using a manual specification of the delimiter, but I'd want to know if is there any clean way to write in CSV format: *val* sparkConf = *new* SparkConf() *val* sc = *new* SparkContext(sparkConf) *val* hiveContext = *new* org.apache.spark.sql.hive.HiveContext(sc) *import* hiveContext.implicits._ hiveContext.sql( CREATE EXTERNAL TABLE IF NOT EXISTS table_name(field1 STRING, field2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION ' + path_on_s3 + ') hiveContext.sql(an INSERT OVERWRITE query to write into the above table) I also need the header of the table to be printed on each written file. I tried with: hiveContext.sql(set hive.cli.print.header=true) But it didn't work. Any hint? Thank you. Best regards, Roberto
Re: Spark 1.4 RDD to DF fails with toDF()
I got a similar issue. Might your as well be related to this https://issues.apache.org/jira/browse/SPARK-8368 ? On Fri, Jun 26, 2015 at 2:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Those provided spark libraries are compatible with scala 2.11? Thanks Best Regards On Fri, Jun 26, 2015 at 4:48 PM, Srikanth srikanth...@gmail.com wrote: Thanks Akhil for checking this out. Here is my build.sbt. name := Weblog Analysis version := 1.0 scalaVersion := 2.11.5 javacOptions ++= Seq(-source, 1.7, -target, 1.7) libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.4.0 % provided, org.apache.spark %% spark-sql % 1.4.0, org.apache.spark %% spark-streaming % 1.4.0, org.apache.spark %% spark-streaming-kafka % 1.4.0, org.apache.spark %% spark-mllib % 1.4.0, org.apache.commons % commons-lang3 % 3.0, org.eclipse.jetty % jetty-client % 8.1.14.v20131031, org.scalatest %% scalatest % 2.2.1 % test, com.databricks % spark-csv_2.11 % 1.0.3, joda-time % joda-time % 2.8.1, org.joda % joda-convert % 1.7 ) resolvers ++= Seq( Sonatype OSS Snapshots at http://oss.sonatype.org/content/repositories/snapshots/;, Sonatype public at http://oss.sonatype.org/content/groups/public/;, Sonatype at http://nexus.scala-tools.org/content/repositories/public;, Scala Tools at http://scala-tools.org/repo-snapshots/;, Typesafeat http://repo.typesafe.com/typesafe/releases/;, Akka at http://akka.io/repository/;, JBoss at http://repository.jboss.org/nexus/content/groups/public/;, GuiceyFruit at http://guiceyfruit.googlecode.com/svn/repo/releases/; ) On Fri, Jun 26, 2015 at 4:13 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Its a scala version conflict, can you paste your build.sbt file? Thanks Best Regards On Fri, Jun 26, 2015 at 7:05 AM, stati srikanth...@gmail.com wrote: Hello, When I run a spark job with spark-submit it fails with below exception for code line /*val webLogDF = webLogRec.toDF().select(ip, date, name)*/ I had similar issue running from spark-shell, then realized that I needed sqlContext.implicit._ Now my code has the following imports /* import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ val sqlContext = new SQLContext(sc) import sqlContext.implicits._ */ Code works fine from spark-shell REPL. It also runs fine when run in local mode from Eclipse. I get this error only when I submit to cluster using spark-submit. bin/spark-submit /local/weblog-analysis_2.11-1.0.jar --class WebLogAnalysis --master spark://machu:7077 I'm testing with spark 1.4. My code was built using scala 2.11 and spark+sparkSQL 1.4.0 as dependency in build.sbt Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebLogAnalysis$.readWebLogFiles(WebLogAnalysis.scala:38) at WebLogAnalysis$.main(WebLogAnalysis.scala:62) at WebLogAnalysis.main(WebLogAnalysis.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) I can provide more code or log if that will help. Let me know. Srikanth -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-4-RDD-to-DF-fails-with-toDF-tp23499.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.OutOfMemoryError: PermGen space
Glad it worked! Actually I got similar issues even with Spark Streaming v1.2.x based drivers. Think also that the default config in Spark on EMR is 512m ! Roberto On Thu, Jun 25, 2015 at 1:20 AM, Srikanth srikanth...@gmail.com wrote: That worked. Thanks! I wonder what changed in 1.4 to cause this. It wouldn't work with anything less than 256m for a simple piece of code. 1.3.1 used to work with default(64m I think) Srikanth On Wed, Jun 24, 2015 at 12:47 PM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Did you try to pass it with --driver-java-options -XX:MaxPermSize=256m as spark-shell input argument? Roberto On Wed, Jun 24, 2015 at 5:57 PM, stati srikanth...@gmail.com wrote: Hello, I moved from 1.3.1 to 1.4.0 and started receiving java.lang.OutOfMemoryError: PermGen space when I use spark-shell. Same Scala code works fine in 1.3.1 spark-shell. I was loading same set of external JARs and have same imports in 1.3.1. I tried increasing perm size to 256m. I still got OOM. /SPARK_REPL_OPTS=-XX:MaxPermSize=256m bin/spark-shell --master spark://machu:7077 --total-executor-cores 12 --packages com.databricks:spark-csv_2.10:1.0.3 --packages joda-time:joda-time:2.8.1 / Spark UI Environment tab didn't show -XX:MaxPermSize. I'm not sure if this config was picked up. This is standalone mode. Any pointers to next step? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-PermGen-space-tp23472.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.OutOfMemoryError: PermGen space
Did you try to pass it with --driver-java-options -XX:MaxPermSize=256m as spark-shell input argument? Roberto On Wed, Jun 24, 2015 at 5:57 PM, stati srikanth...@gmail.com wrote: Hello, I moved from 1.3.1 to 1.4.0 and started receiving java.lang.OutOfMemoryError: PermGen space when I use spark-shell. Same Scala code works fine in 1.3.1 spark-shell. I was loading same set of external JARs and have same imports in 1.3.1. I tried increasing perm size to 256m. I still got OOM. /SPARK_REPL_OPTS=-XX:MaxPermSize=256m bin/spark-shell --master spark://machu:7077 --total-executor-cores 12 --packages com.databricks:spark-csv_2.10:1.0.3 --packages joda-time:joda-time:2.8.1 / Spark UI Environment tab didn't show -XX:MaxPermSize. I'm not sure if this config was picked up. This is standalone mode. Any pointers to next step? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-PermGen-space-tp23472.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
I confirm, Christopher was very kind helping me out here. The solution presented in the linked doc worked perfectly. IMO it should be linked in the official Spark documentation. Thanks again, Roberto On 20 Jun 2015, at 19:25, Bozeman, Christopher bozem...@amazon.com wrote: We worked it out. There was multiple items (like location of remote metastore and db user auth) to make HiveContext happy in yarn-cluster mode. For reference https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/examples/using-hivecontext-yarn-cluster.md -Christopher Bozeman On Jun 20, 2015, at 7:24 AM, Andrew Lee alee...@hotmail.com wrote: Hi Roberto, I'm not an EMR person, but it looks like option -h is deploying the necessary dataneucleus JARs for you. The req for HiveContext is the hive-site.xml and dataneucleus JARs. As long as these 2 are there, and Spark is compiled with -Phive, it should work. spark-shell runs in yarn-client mode. Not sure whether your other application is running under the same mode or a different one. Try specifying yarn-client mode and see if you get the same result as spark-shell. From: roberto.coluc...@gmail.com Date: Wed, 10 Jun 2015 14:32:04 +0200 Subject: [Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient To: user@spark.apache.org Hi! I'm struggling with an issue with Spark 1.3.1 running on YARN, running on an AWS EMR cluster. Such cluster is based on AMI 3.7.0 (hence Amazon Linux 2015.03, Hive 0.13 already installed and configured on the cluster, Hadoop 2.4, etc...). I make use of the AWS emr-bootstrap-action install-spark (https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark) with the option/version -v1.3.1e so to get the latest Spark for EMR installed and available. I also have a simple Spark Streaming driver in my project. Such driver is part of a larger Maven project: in the pom.xml I'm currently using [...] scala.binary.version2.10/scala.binary.version scala.version2.10.4/scala.version java.version1.7/java.version spark.version1.3.1/spark.version hadoop.version2.4.1/hadoop.version [] dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version${hadoop.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope /dependency In fact, at compile and build time everything works just fine if, in my driver, I have: - val sparkConf = new SparkConf() .setAppName(appName) .set(spark.local.dir, /tmp/ + appName) .set(spark.streaming.unpersist, true) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .registerKryoClasses(Array(classOf[java.net.URI], classOf[String])) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, config.batchDuration) import org.apache.spark.streaming.StreamingContext._ ssc.checkpoint(sparkConf.get(spark.local.dir) + checkpointRelativeDir) some input reading actions some input transformation actions val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import sqlContext.implicits._ sqlContext.sql(an-HiveQL-query) ssc.start() ssc.awaitTerminationOrTimeout(config.timeout) --- What happens is that, right after have been launched, the driver fails with the exception: 15/06/10 11:38:18 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:239) at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95) at
[Spark 1.3.1 on YARN on EMR] Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Hi! I'm struggling with an issue with Spark 1.3.1 running on YARN, running on an AWS EMR cluster. Such cluster is based on AMI 3.7.0 (hence Amazon Linux 2015.03, Hive 0.13 already installed and configured on the cluster, Hadoop 2.4, etc...). I make use of the AWS emr-bootstrap-action *install-spark* ( https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark) with the option/version* -v1.3.1e* so to get the latest Spark for EMR installed and available. I also have a simple Spark Streaming driver in my project. Such driver is part of a larger Maven project: in the *pom.xml* I'm currently using [...] scala.binary.version2.10/scala.binary.version scala.version2.10.4/scala.version java.version1.7/java.version spark.version1.3.1/spark.version hadoop.version2.4.1/hadoop.version [] dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope exclusions exclusion groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId /exclusion /exclusions /dependency dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId version${hadoop.version}/version scopeprovided/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-hive_${scala.binary.version}/artifactId version${spark.version}/version scopeprovided/scope /dependency In fact, at compile and build time everything works just fine if, in my driver, I have: - *val* sparkConf = *new* SparkConf() .setAppName(appName) .set(spark.local.dir, /tmp/ + appName) .set(spark.streaming.unpersist, true) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .registerKryoClasses(Array(classOf[java.net.URI], classOf[String])) *val* sc = *new* SparkContext(sparkConf) *val* ssc = *new* StreamingContext(sc, config.batchDuration) *import* org.apache.spark.streaming.StreamingContext._ ssc.checkpoint(sparkConf.get(spark.local.dir) + checkpointRelativeDir) some input reading actions some input transformation actions *val* sqlContext = *new* org.apache.spark.sql.hive.HiveContext(sc) *import* sqlContext.implicits._ sqlContext.sql(an-HiveQL-query) ssc.start() ssc.awaitTerminationOrTimeout(config.timeout) --- What happens is that, right after have been launched, the driver fails with the exception: 15/06/10 11:38:18 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:239) at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:251) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:250) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:95) at myDriver.scala: line of the sqlContext.sql(query) Caused by some stuff Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory ... Caused by: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory Thinking about a wrong Hive installation/configuration or libs/classpath definition, I SSHed into the cluster and launched a *spark-shell.* Excluding the app configuration and StreamingContext usage/definition, I then carried out all the actions listed in the driver implementation, in particular all the Hive-related ones and they all went through smoothly! I also tried to use the optional *-h* argument ( https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/README.md#arguments-optional) in the install-spark emr-bootstrap-action, but the driver failed the very same way. Furthermore, when launching a spark-shell (on the EMR cluster with Spark installed with the -h option), I also got: 15/06/09 14:20:51 WARN conf.HiveConf: hive-default.xml not found on CLASSPATH 15/06/09 14:20:52 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/06/09 14:20:52 INFO metastore.ObjectStore: ObjectStore, initialize called 15/06/09 14:20:52 WARN DataNucleus.General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions
Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at mycompany.mypackage.MyClassFunction.apply(MyClassFunction.scala:34) at mycompany.mypackage.MyClass$.main(MyClass.scala:254) at mycompany.mypackage.MyClass.main(MyClass.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:441) 15/03/17 11:32:14 INFO
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
You know, I actually have one of the columns called timestamp ! This may really cause the problem reported in the bug you linked, I guess. On Wed, Mar 18, 2015 at 3:37 PM, Cheng Lian lian.cs@gmail.com wrote: I suspect that you hit this bug https://issues.apache.org/jira/browse/SPARK-6250, it depends on the actual contents of your query. Yin had opened a PR for this, although not merged yet, it should be a valid fix https://github.com/apache/spark/pull/5078 This fix will be included in 1.3.1. Cheng On 3/18/15 10:04 PM, Roberto Coluccio wrote: Hi Cheng, thanks for your reply. The query is something like: SELECT * FROM ( SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\ UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] ) a I'm using just sqlContext, no hiveContext. Please, note once again that this perfectly worked w/ Spark 1.1.x. The tables, i.e. tableD and tableM are previously registered with the RDD.registerTempTable method, where the input RDDs are actually a RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case classes with only (and less than 22) String fields. Hope the situation is a bit more clear. Thanks anyone who will help me out here. Roberto On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
Hi Cheng, thanks for your reply. The query is something like: SELECT * FROM ( SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\ UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] ) a I'm using just sqlContext, no hiveContext. Please, note once again that this perfectly worked w/ Spark 1.1.x. The tables, i.e. tableD and tableM are previously registered with the RDD.registerTempTable method, where the input RDDs are actually a RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case classes with only (and less than 22) String fields. Hope the situation is a bit more clear. Thanks anyone who will help me out here. Roberto On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254
Re: Spark SQL weird exception after upgrading from 1.1.1 to 1.2.x
Hey Cheng, thank you so much for your suggestion, the problem was actually a column/field called timestamp in one of the case classes!! Once I changed its name everything worked out fine again. Let me say it was kinda frustrating ... Roberto On Wed, Mar 18, 2015 at 4:07 PM, Roberto Coluccio roberto.coluc...@gmail.com wrote: You know, I actually have one of the columns called timestamp ! This may really cause the problem reported in the bug you linked, I guess. On Wed, Mar 18, 2015 at 3:37 PM, Cheng Lian lian.cs@gmail.com wrote: I suspect that you hit this bug https://issues.apache.org/jira/browse/SPARK-6250, it depends on the actual contents of your query. Yin had opened a PR for this, although not merged yet, it should be a valid fix https://github.com/apache/spark/pull/5078 This fix will be included in 1.3.1. Cheng On 3/18/15 10:04 PM, Roberto Coluccio wrote: Hi Cheng, thanks for your reply. The query is something like: SELECT * FROM ( SELECT m.column1, IF (d.columnA IS NOT null, d.columnA, m.column2), ..., m.columnN FROM tableD d RIGHT OUTER JOIN tableM m on m.column2 = d.columnA WHERE m.column2!=\None\ AND d.columnA!=\\ UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] UNION ALL SELECT ... [another SELECT statement with different conditions but same tables] ) a I'm using just sqlContext, no hiveContext. Please, note once again that this perfectly worked w/ Spark 1.1.x. The tables, i.e. tableD and tableM are previously registered with the RDD.registerTempTable method, where the input RDDs are actually a RDD[MyCaseClassM/D], with MyCaseClassM and MyCaseClassD being simple case classes with only (and less than 22) String fields. Hope the situation is a bit more clear. Thanks anyone who will help me out here. Roberto On Wed, Mar 18, 2015 at 12:09 PM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the query? If it's confidential, could you please help constructing a query that reproduces this issue? Cheng On 3/18/15 6:03 PM, Roberto Coluccio wrote: Hi everybody, When trying to upgrade from Spark 1.1.1 to Spark 1.2.x (tried both 1.2.0 and 1.2.1) I encounter a weird error never occurred before about which I'd kindly ask for any possible help. In particular, all my Spark SQL queries fail with the following exception: java.lang.RuntimeException: [1.218] failure: identifier expected [my query listed] ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) ... The unit tests I've got for testing this stuff fail both if I build+test the project with Maven and if I run then as single ScalaTest files or test suites/packages. When running my app as usual on EMR in YARN-cluster mode, I get the following: 15/03/17 11:32:14 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^) Exception in thread Driver java.lang.RuntimeException: [1.218] failure: identifier expected SELECT * FROM ... (my query) ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser
Spark UI port issue when deploying Spark driver on YARN in yarn-cluster mode on EMR
Hello folks, I'm trying to deploy a Spark driver on Amazon EMR in yarn-cluster mode expecting to be able to access the Spark UI from the spark-master-ip:4040 address (default port). The problem here is that the Spark UI port is always defined randomly at runtime, although I also tried to specify it in the spark-defaults.conf file: in order to do so, I used this: https://github.com/awslabs/emr-bootstrap-actions/tree/master/spark#3-utilize-an-emr-step-to-configure-the-spark-default-configuration-optional , setting the spark.ui.port to a static known value. No luck, every time I launch (using the spark-submit script from the yarn-master node) a Spark driver, the UI port is chose randomly. Is there any configurations I'm missing out here? Thank you very much. Roberto
Access resources from jar-local resources folder
Hello folks, I have a Spark Streaming application built with Maven (as jar) and deployed with the spark-submit script. The application project has the following (main) structure: myApp src main scala com.mycompany.package MyApp.scala DoSomething.scala ... resources aPerlScript.pl ... test scala com.mycompany.package MyAppTest.scala ... target ... pom.xml In the DoSomething.scala object I have a method (let's call it doSomething()) that tries to run a perl script as an external scala.sys.process.Process , taken from the resources folder. I call then DoSomething.doSomething(). Ok, here's the *issue*: I was not able to access such script, not with absolute paths, relative paths, getClass.getClassLoader.getResource, getClass.getResource, I have specified the resources folder in my pom.xml...None of my attempts succeeded: I don't know how to find the stuff I put in src/main/resources. I will appreciate any help. SIDE NOTES: - I use an external Process instead of a Spark pipe because, at this step of my workflow, I must handle binary files as input and output. - I'm using spark-streaming 1.1.0, Scala 2.10.4 and Java 7. I build the jar with Maven install from within Eclipse (Kepler) - When I use the getClass.getClassLoader.getResource standard method to access resources I find that the actual classpath is the spark-submit script's one. - Thank you and best regards, Roberto