Re: physical memory usage keep increasing for spark app on Yarn
Hi Yang! I don't know exactly why this happen, but i think GC can't work to fast enough or size of data with additional objects created while computations to big for executor. And i found that this problem only if you make some data manipulations. You can cache you data first, after that, write in one partiton. For example val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d") dropDF.cache() or dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath) val dropDF = spark.read.parquet(temppath) and then dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath) Best, On Sun, Jan 22, 2017 at 12:31 PM Yang Caowrote: > Also, do you know why this happen? > > On 2017年1月20日, at 18:23, Pavel Plotnikov > wrote: > > Hi Yang, > i have faced with the same problem on Mesos and to circumvent this issue i > am usually increase partition number. On last step in your code you reduce > number of partitions to 1, try to set bigger value, may be it solve this > problem. > > Cheers, > Pavel > > On Fri, Jan 20, 2017 at 12:35 PM Yang Cao wrote: > > Hi all, > > I am running a spark application on YARN-client mode with 6 executors > (each 4 cores and executor memory = 6G and Overhead = 4G, spark version: > 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get > killed by node manager; and give out the info that tells me to boost > spark.yarn.excutor.memoryOverhead. > I know that this param mainly control the size of memory allocated > off-heap. But I don’t know when and how the spark engine will use this part > of memory. Also increase that part of memory not always solve my > problem. sometimes works sometimes not. It trends to be useless when the > input data is large. > > FYI, my app’s logic is quite simple. It means to combine the small files > generated in one single day (one directory one day) into a single one and > write back to hdfs. Here is the core code: > > val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = > ${ts.day}").coalesce(400) > > val dropDF = > df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d") > > dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath) > > The source file may have hundreds to thousands level’s partition. And the > total parquet file is around 1to 5 gigs. Also I find that in the step that > shuffle reading data from different machines, The size of shuffle read is > about 4 times larger than the input size, Which is wired or some principle > I don’t know. > > Anyway, I have done some search myself for this problem. Some article said > that it’s on the direct buffer memory (I don’t set myself). Some article > said that people solve it with more frequent full GC. Also I find one > people on SO with very similar situation: > http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn > This guy claimed that it’s a bug with parquet but comment questioned him. > People in this mail list may also receive an email hours ago from > blondowski who described this problem while writing json: > http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none > > So it looks like to be common question for different output format. I hope > someone with experience about this problem could make an explanation about > this issue. Why this happen and what is a reliable way to solve this > problem. > > Best, > > >
Re: Using mapWithState without a checkpoint
Hello spark users, I do have the same question as Daniel. I would like to save the state in Cassandra and on failure recover using the initialState. If some one has already tried this, please share your experience and sample code. Thanks. On Thu, Nov 17, 2016 at 9:45 AM, Daniel Haviv < daniel.ha...@veracity-group.com> wrote: > Hi, > Is it possible to use mapWithState without checkpointing at all ? > I'd rather have the whole application fail, restart and reload an > initialState RDD than pay for checkpointing every 10 batches. > > Thank you, > Daniel >
How to write Spark DataFrame to salesforce table
Hi I am trying to write a dataframe into salesforce table. facWriteBack.write.mode("append").jdbc(s"jdbc:salesforce:UseSandbox=True;User=;Password=;Security Token=;Timeout=0", "SFORCE.") Am I doing it correctly? And also suggest, how to upsert a record? Thanks in advance Niraj Disclaimer : This email communication may contain privileged and confidential information and is intended for the use of the addressee only. If you are not an intended recipient you are requested not to reproduce, copy disseminate or in any manner distribute this email communication as the same is strictly prohibited. If you have received this email in error, please notify the sender immediately by return e-mail and delete the communication sent in error. Email communications cannot be guaranteed to be secure & error free and Incedo Inc. is not liable for any errors in the email communication or for the proper, timely and complete transmission thereof.
Spark Streaming proactive monitoring
Hi, there From the Spark UI, we can monitor the following two metrics: • Processing Time - The time to process each batch of data. • Scheduling Delay - the time a batch waits in a queue for the processing of previous batches to finish. However, what is the best way to monitor them proactively? For example, if processing time/scheduling delay exceed certain threshold, send alert to the admin/developer? Lan - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Setting startingOffsets to earliest in structured streaming never catches up
+1 to Ryan's suggestion of setting maxOffsetsPerTrigger. This way you can at least see how quickly it is making progress towards catching up. On Sun, Jan 22, 2017 at 7:02 PM, Timothy Chanwrote: > I'm using version 2.02. > > The difference I see between using latest and earliest is a series of jobs > that take less than a second vs. one job that goes on for over 24 hours. > > On Sun, Jan 22, 2017 at 6:54 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Which Spark version are you using? If you are using 2.1.0, could you use >> the monitoring APIs (http://spark.apache.org/docs/ >> latest/structured-streaming-programming-guide.html# >> monitoring-streaming-queries) to check the input rate and the processing >> rate? One possible issue is that the Kafka source launched a pretty large >> batch and it took too long to finish it. If so, you can use >> "maxOffsetsPerTrigger" option to limit the data size in a batch in order to >> observe the progress. >> >> On Sun, Jan 22, 2017 at 10:22 AM, Timothy Chan >> wrote: >> >> I'm running my structured streaming jobs in EMR. We were thinking a worst >> case scenario recovery situation would be to spin up another cluster and >> set startingOffsets to earliest (our Kafka cluster has a retention policy >> of 7 days). >> >> My observation is that the job never catches up to latest. This is not >> acceptable. I've set the number of partitions for the topic to 6. I've >> tried using a cluster of 4 in EMR. >> >> The producer rate for this topic is 4 events/second. Does anyone have any >> suggestions on what I can do to have my consumer catch up to latest faster? >> >> >>
Re: why does spark web UI keeps changing its port?
No. Each app has its own UI which runs (starting on) port 4040. On Mon, Jan 23, 2017 at 12:05 PM, kant kodaliwrote: > I am using standalone mode so wouldn't be 8080 for my app web ui as well? > There is nothing running on 4040 in my cluster. > > http://spark.apache.org/docs/latest/security.html#standalone-mode-only > > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin > wrote: >> >> That's the Master, whose default port is 8080 (not 4040). The default >> port for the app's UI is 4040. >> >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali wrote: >> > I am not sure why Spark web UI keeps changing its port every time I >> > restart >> > a cluster? how can I make it run always on one port? I did make sure >> > there >> > is no process running on 4040(spark default web ui port) however it >> > still >> > starts at 8080. any ideas? >> > >> > >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at >> > http://x.x.x.x:8080 >> > >> > >> > Thanks! >> >> >> >> -- >> Marcelo > > -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: why does spark web UI keeps changing its port?
I am using standalone mode so wouldn't be 8080 for my app web ui as well? There is nothing running on 4040 in my cluster. http://spark.apache.org/docs/latest/security.html#standalone-mode-only On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzinwrote: > That's the Master, whose default port is 8080 (not 4040). The default > port for the app's UI is 4040. > > On Mon, Jan 23, 2017 at 11:47 AM, kant kodali wrote: > > I am not sure why Spark web UI keeps changing its port every time I > restart > > a cluster? how can I make it run always on one port? I did make sure > there > > is no process running on 4040(spark default web ui port) however it still > > starts at 8080. any ideas? > > > > > > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at > > http://x.x.x.x:8080 > > > > > > Thanks! > > > > -- > Marcelo >
Re: why does spark web UI keeps changing its port?
hmm..I guess in that case my assumption of "app" is wrong. I thought the app is a client jar that you submit. no? If so, say I submit multiple jobs then I get two UI'S? On Mon, Jan 23, 2017 at 12:07 PM, Marcelo Vanzinwrote: > No. Each app has its own UI which runs (starting on) port 4040. > > On Mon, Jan 23, 2017 at 12:05 PM, kant kodali wrote: > > I am using standalone mode so wouldn't be 8080 for my app web ui as well? > > There is nothing running on 4040 in my cluster. > > > > http://spark.apache.org/docs/latest/security.html#standalone-mode-only > > > > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin > > wrote: > >> > >> That's the Master, whose default port is 8080 (not 4040). The default > >> port for the app's UI is 4040. > >> > >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali > wrote: > >> > I am not sure why Spark web UI keeps changing its port every time I > >> > restart > >> > a cluster? how can I make it run always on one port? I did make sure > >> > there > >> > is no process running on 4040(spark default web ui port) however it > >> > still > >> > starts at 8080. any ideas? > >> > > >> > > >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at > >> > http://x.x.x.x:8080 > >> > > >> > > >> > Thanks! > >> > >> > >> > >> -- > >> Marcelo > > > > > > > > -- > Marcelo >
Re: why does spark web UI keeps changing its port?
As I said. Each app gets their own UI. Look at the logs printed to the output. The port will depend on whether they're running on the same host at the same time. This is irrespective of how they are run. On Mon, Jan 23, 2017 at 12:40 PM, kant kodaliwrote: > yes I meant submitting through spark-submit. > > so If I do spark-submit A.jar and spark-submit A.jar again. Do I get two > UI's or one UI'? and which ports do they run on when using the stand alone > mode? > > On Mon, Jan 23, 2017 at 12:19 PM, Marcelo Vanzin > wrote: >> >> Depends on what you mean by "job". Which is why I prefer "app", which >> is clearer (something you submit using "spark-submit", for example). >> >> But really, I'm not sure what you're asking now. >> >> On Mon, Jan 23, 2017 at 12:15 PM, kant kodali wrote: >> > hmm..I guess in that case my assumption of "app" is wrong. I thought the >> > app >> > is a client jar that you submit. no? If so, say I submit multiple jobs >> > then >> > I get two UI'S? >> > >> > On Mon, Jan 23, 2017 at 12:07 PM, Marcelo Vanzin >> > wrote: >> >> >> >> No. Each app has its own UI which runs (starting on) port 4040. >> >> >> >> On Mon, Jan 23, 2017 at 12:05 PM, kant kodali >> >> wrote: >> >> > I am using standalone mode so wouldn't be 8080 for my app web ui as >> >> > well? >> >> > There is nothing running on 4040 in my cluster. >> >> > >> >> > >> >> > http://spark.apache.org/docs/latest/security.html#standalone-mode-only >> >> > >> >> > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin >> >> > >> >> > wrote: >> >> >> >> >> >> That's the Master, whose default port is 8080 (not 4040). The >> >> >> default >> >> >> port for the app's UI is 4040. >> >> >> >> >> >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali >> >> >> wrote: >> >> >> > I am not sure why Spark web UI keeps changing its port every time >> >> >> > I >> >> >> > restart >> >> >> > a cluster? how can I make it run always on one port? I did make >> >> >> > sure >> >> >> > there >> >> >> > is no process running on 4040(spark default web ui port) however >> >> >> > it >> >> >> > still >> >> >> > starts at 8080. any ideas? >> >> >> > >> >> >> > >> >> >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at >> >> >> > http://x.x.x.x:8080 >> >> >> > >> >> >> > >> >> >> > Thanks! >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> >> Marcelo >> >> > >> >> > >> >> >> >> >> >> >> >> -- >> >> Marcelo >> > >> > >> >> >> >> -- >> Marcelo > > -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: why does spark web UI keeps changing its port?
Depends on what you mean by "job". Which is why I prefer "app", which is clearer (something you submit using "spark-submit", for example). But really, I'm not sure what you're asking now. On Mon, Jan 23, 2017 at 12:15 PM, kant kodaliwrote: > hmm..I guess in that case my assumption of "app" is wrong. I thought the app > is a client jar that you submit. no? If so, say I submit multiple jobs then > I get two UI'S? > > On Mon, Jan 23, 2017 at 12:07 PM, Marcelo Vanzin > wrote: >> >> No. Each app has its own UI which runs (starting on) port 4040. >> >> On Mon, Jan 23, 2017 at 12:05 PM, kant kodali wrote: >> > I am using standalone mode so wouldn't be 8080 for my app web ui as >> > well? >> > There is nothing running on 4040 in my cluster. >> > >> > http://spark.apache.org/docs/latest/security.html#standalone-mode-only >> > >> > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin >> > wrote: >> >> >> >> That's the Master, whose default port is 8080 (not 4040). The default >> >> port for the app's UI is 4040. >> >> >> >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali >> >> wrote: >> >> > I am not sure why Spark web UI keeps changing its port every time I >> >> > restart >> >> > a cluster? how can I make it run always on one port? I did make sure >> >> > there >> >> > is no process running on 4040(spark default web ui port) however it >> >> > still >> >> > starts at 8080. any ideas? >> >> > >> >> > >> >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at >> >> > http://x.x.x.x:8080 >> >> > >> >> > >> >> > Thanks! >> >> >> >> >> >> >> >> -- >> >> Marcelo >> > >> > >> >> >> >> -- >> Marcelo > > -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: why does spark web UI keeps changing its port?
That's the Master, whose default port is 8080 (not 4040). The default port for the app's UI is 4040. On Mon, Jan 23, 2017 at 11:47 AM, kant kodaliwrote: > I am not sure why Spark web UI keeps changing its port every time I restart > a cluster? how can I make it run always on one port? I did make sure there > is no process running on 4040(spark default web ui port) however it still > starts at 8080. any ideas? > > > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at > http://x.x.x.x:8080 > > > Thanks! -- Marcelo - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
why does spark web UI keeps changing its port?
I am not sure why Spark web UI keeps changing its port every time I restart a cluster? how can I make it run always on one port? I did make sure there is no process running on 4040(spark default web ui port) however it still starts at 8080. any ideas? MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://x.x.x.x:8080 Thanks!
Spark streaming multiple kafka topic doesn't work at-least-once
Hi everyone, I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from multiple kafka topics. After starting the job, everything works fine first (like 700 req/sec) but after a while (couples of days or a week) it starts processing only some part of the data (like 350 req/sec). When I check the kafka topics, I can see that there are still 700 req/sec coming to the topics. I don't see any errors, exceptions or any other problem. The job works fine when I start the same code with just single kafka topic. Do you have any idea or a clue to understand the problem? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: converting timestamp column to a java.util.Date
Hi, I think Spark UDF can only handle `java.sql.Date`. So, you need to change the return type in you UDF. // maropu On Tue, Jan 24, 2017 at 8:18 AM, Marco Mistroniwrote: > HI all > i am trying to convert a string column, in a Dataframe , to a > java.util.Date but i am getting this exception > > [dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo > - Removed broadcast_0_piece0 on 169.254.2.140:53468 in memory (size: 14.3 > KB, free: 767.4 MB) > Exception in thread "main" java.lang.UnsupportedOperationException: > Schema for type java.util.Date is not supported > > here's my code > > val tstampToDateFunc:(java.sql.Timestamp => java.util.Date) = ts => new > java.util.Date(ts.getTime) > val tsampConversionFunc = udf(tstampToDateFunc) > > sharesDf.withColumn("price", col("_c2").cast("double")) > .withColumn("creationTime", > tsampConversionFunc(col("_c1"))) > > Are there any workarounds? > i m trying to import data into mongoDB via Spark. The source is a csv file > where > i have 1 timestamp column and a bunch of strings. i will need to > convert that > to something compatible with a mongo's ISODate > > kr > marco > > -- --- Takeshi Yamamuro
converting timestamp column to a java.util.Date
HI all i am trying to convert a string column, in a Dataframe , to a java.util.Date but i am getting this exception [dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo - Removed broadcast_0_piece0 on 169.254.2.140:53468 in memory (size: 14.3 KB, free: 767.4 MB) Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type java.util.Date is not supported here's my code val tstampToDateFunc:(java.sql.Timestamp => java.util.Date) = ts => new java.util.Date(ts.getTime) val tsampConversionFunc = udf(tstampToDateFunc) sharesDf.withColumn("price", col("_c2").cast("double")) .withColumn("creationTime", tsampConversionFunc(col("_c1"))) Are there any workarounds? i m trying to import data into mongoDB via Spark. The source is a csv file where i have 1 timestamp column and a bunch of strings. i will need to convert that to something compatible with a mongo's ISODate kr marco
Re: Do jobs fail because of other users of a cluster?
In general, Java processes fail with an OutOfMemoryError when your code and data does not fit into the memory allocated to the runtime. In Spark, that memory is controlled through the --executor-memory flag. If you are running Spark on YARN, then YARN configuration will dictate the maximum memory that your Spark executors can request. Here is a pretty good article about setting memory in Spark on YARN: http://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html If the OS were to kill your process because the system has run out of memory, you would see an error printed to standard error that looks like this: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0xe232, 37601280, 0) failed; error='Cannot allocate memory' (errno=12) # There is insufficient memory for the Java Runtime Environment to continue. On Wed, Jan 18, 2017 at 10:25 AM, David Fresewrote: > Hello everybody, > > being quite new to Spark, I am struggling a lot with OutOfMemory exceptions > and "GC overhead limit reached" failures of my jobs, submitted from a > spark-shell and "master yarn". > > Playing with --num-executors, --executor-memory and --executor-cores I > occasionally get something done. But I'm also not the only one using the > cluster, and it seems to me, that my jobs sometimes fail with the above > errors, because other people have something running, or have a spark-shell > open at that time; or at least it seems that with the same code, data and > settings, the job sometimes completes and sometimes fails. > > Is that "expected behaviour"? > > What options/tools can be used to make the success/failure of a job > deterministic - there a lot things out there like, 'dynamic allocation', > Hadoop 'fair scheduler'; but very hard for a newbee to evaluate that (resp. > make suggestions to the admins). > > If it cannot be made deterministic, how can I reliably distinguish the OOM > failures that are caused by incorrect settings on my side (e.g. because my > data does not fit into memory), and those failures that are caused by > resource consumption/blocking from other jobs? > > Thanks for sharing your thoughts and experiences! > > > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Do-jobs-fail-because-of-other-users- > of-a-cluster-tp28318.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: tuning the spark.locality.wait
This article recommends setting spark.locality.wait to 10 (milliseconds) in the case of using Spark Streaming and gives an explanation of why they chose that value. If using batch Spark, that value should still be a good starting place https://www.inovex.de/blog/247-spark-streaming-on-yarn-in-production/ On Sat, Jan 21, 2017 at 5:06 AM, Cesarwrote: > > I am working with datasets of the order of 200 GB using 286 cores divided > across 143 executor. Each executor has 32 Gb (which makes every core 15 > Gb). And I am using Spark 1.6. > > > I would like to tune the spark.locality.wait. Does anyone can give me a > range on the values of spark.locality wait that I can try ? > > > > Thanks a lot ! > -- > Cesar Flores >
How to make the state in a streaming application idempotent?
In a Wordcount application which stores the count of all the words input so far using mapWithState. How do I make sure my counts are not messed up if I happen to read a line more than once? Appreciate your response. Thanks
what would be the recommended production requirements?
Hi, I am planning to go production using spark standalone mode using the following configuration and I would like to know if I am missing something or any other suggestions are welcome. 1) Three Spark Standalone Master deployed on different nodes and using Apache Zookeeper for Leader Election. 2) Two or Three worker nodes (For our workloads which is being able to process 5000 messages/sec two worker nodes are more than sufficient when we ran our tests but for the safe side we may use three ) 3) will use HDFS for storing recoverable state, WAL, checkpoint etc since we are running a streaming application. 4) some sort of monitoring and alerting framework Do I need anything else apart from this? What's not clear to me is how service discovery is done. For examples, Right now we manually have to edit the ip addresses of worker machines in SPARK_HOME/conf/slaves so we have to bring the entire cluster down. so what is most common way to solve this given that we dont plan on using mesos or yarn? I know of some tools which can help me here but I would like to know which of those tools are widely used? Any other suggestions in case I am missing are welcome. Thanks, kant
Re: why does spark web UI keeps changing its port?
yes I meant submitting through spark-submit. so If I do spark-submit A.jar and spark-submit A.jar again. Do I get two UI's or one UI'? and which ports do they run on when using the stand alone mode? On Mon, Jan 23, 2017 at 12:19 PM, Marcelo Vanzinwrote: > Depends on what you mean by "job". Which is why I prefer "app", which > is clearer (something you submit using "spark-submit", for example). > > But really, I'm not sure what you're asking now. > > On Mon, Jan 23, 2017 at 12:15 PM, kant kodali wrote: > > hmm..I guess in that case my assumption of "app" is wrong. I thought the > app > > is a client jar that you submit. no? If so, say I submit multiple jobs > then > > I get two UI'S? > > > > On Mon, Jan 23, 2017 at 12:07 PM, Marcelo Vanzin > > wrote: > >> > >> No. Each app has its own UI which runs (starting on) port 4040. > >> > >> On Mon, Jan 23, 2017 at 12:05 PM, kant kodali > wrote: > >> > I am using standalone mode so wouldn't be 8080 for my app web ui as > >> > well? > >> > There is nothing running on 4040 in my cluster. > >> > > >> > http://spark.apache.org/docs/latest/security.html# > standalone-mode-only > >> > > >> > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin > > >> > wrote: > >> >> > >> >> That's the Master, whose default port is 8080 (not 4040). The default > >> >> port for the app's UI is 4040. > >> >> > >> >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali > >> >> wrote: > >> >> > I am not sure why Spark web UI keeps changing its port every time I > >> >> > restart > >> >> > a cluster? how can I make it run always on one port? I did make > sure > >> >> > there > >> >> > is no process running on 4040(spark default web ui port) however it > >> >> > still > >> >> > starts at 8080. any ideas? > >> >> > > >> >> > > >> >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at > >> >> > http://x.x.x.x:8080 > >> >> > > >> >> > > >> >> > Thanks! > >> >> > >> >> > >> >> > >> >> -- > >> >> Marcelo > >> > > >> > > >> > >> > >> > >> -- > >> Marcelo > > > > > > > > -- > Marcelo >
Re: Spark streaming multiple kafka topic doesn't work at-least-once
Are you using receiver-based or direct stream? Are you doing 1 stream per topic, or 1 stream for all topics? If you're using the direct stream, the actual topics and offset ranges should be visible in the logs, so you should be able to see more detail about what's happening (e.g. all topics are still being processed but offsets are significantly behind, vs only certain topics being processed but keeping up with latest offsets) On Mon, Jan 23, 2017 at 3:14 PM, hakanilterwrote: > Hi everyone, > > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from > multiple kafka topics. After starting the job, everything works fine first > (like 700 req/sec) but after a while (couples of days or a week) it starts > processing only some part of the data (like 350 req/sec). When I check the > kafka topics, I can see that there are still 700 req/sec coming to the > topics. I don't see any errors, exceptions or any other problem. The job > works fine when I start the same code with just single kafka topic. > > Do you have any idea or a clue to understand the problem? > > Thanks. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Do jobs fail because of other users of a cluster?
ok On Tue, Jan 24, 2017 at 7:13 AM, Matthew Daileywrote: > In general, Java processes fail with an OutOfMemoryError when your code > and data does not fit into the memory allocated to the runtime. In Spark, > that memory is controlled through the --executor-memory flag. > If you are running Spark on YARN, then YARN configuration will dictate the > maximum memory that your Spark executors can request. Here is a pretty > good article about setting memory in Spark on YARN: > http://www.cloudera.com/documentation/enterprise/5-5- > x/topics/cdh_ig_running_spark_on_yarn.html > > If the OS were to kill your process because the system has run out of > memory, you would see an error printed to standard error that looks like > this: > > Java HotSpot(TM) 64-Bit Server VM warning: INFO: > os::commit_memory(0xe232, 37601280, 0) failed; error='Cannot > allocate memory' (errno=12) > # There is insufficient memory for the Java Runtime Environment to continue. > > > > On Wed, Jan 18, 2017 at 10:25 AM, David Frese > wrote: > >> Hello everybody, >> >> being quite new to Spark, I am struggling a lot with OutOfMemory >> exceptions >> and "GC overhead limit reached" failures of my jobs, submitted from a >> spark-shell and "master yarn". >> >> Playing with --num-executors, --executor-memory and --executor-cores I >> occasionally get something done. But I'm also not the only one using the >> cluster, and it seems to me, that my jobs sometimes fail with the above >> errors, because other people have something running, or have a spark-shell >> open at that time; or at least it seems that with the same code, data and >> settings, the job sometimes completes and sometimes fails. >> >> Is that "expected behaviour"? >> >> What options/tools can be used to make the success/failure of a job >> deterministic - there a lot things out there like, 'dynamic allocation', >> Hadoop 'fair scheduler'; but very hard for a newbee to evaluate that >> (resp. >> make suggestions to the admins). >> >> If it cannot be made deterministic, how can I reliably distinguish the OOM >> failures that are caused by incorrect settings on my side (e.g. because my >> data does not fit into memory), and those failures that are caused by >> resource consumption/blocking from other jobs? >> >> Thanks for sharing your thoughts and experiences! >> >> >> >> >> >> -- >> View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Do-jobs-fail-because-of-other-users-of >> -a-cluster-tp28318.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >
Re: Spark streaming multiple kafka topic doesn't work at-least-once
I'm using DirectStream as one stream for all topics. I check the offset ranges from Kafka Manager and don't see any significant deltas. On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeningerwrote: > Are you using receiver-based or direct stream? > > Are you doing 1 stream per topic, or 1 stream for all topics? > > If you're using the direct stream, the actual topics and offset ranges > should be visible in the logs, so you should be able to see more > detail about what's happening (e.g. all topics are still being > processed but offsets are significantly behind, vs only certain topics > being processed but keeping up with latest offsets) > > On Mon, Jan 23, 2017 at 3:14 PM, hakanilter wrote: > > Hi everyone, > > > > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from > > multiple kafka topics. After starting the job, everything works fine > first > > (like 700 req/sec) but after a while (couples of days or a week) it > starts > > processing only some part of the data (like 350 req/sec). When I check > the > > kafka topics, I can see that there are still 700 req/sec coming to the > > topics. I don't see any errors, exceptions or any other problem. The job > > works fine when I start the same code with just single kafka topic. > > > > Do you have any idea or a clue to understand the problem? > > > > Thanks. > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Spark-streaming-multiple-kafka- > topic-doesn-t-work-at-least-once-tp28334.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re:
Hi Keith, Can you try including a clean-up step at the end of job, before driver is out of SparkContext, to clean the necessary files through some regex patterns or so, on all nodes in your cluster by default. If files are not available on few nodes, that should not be a problem, isnnt? On Sun, Jan 22, 2017 at 1:26 AM, Mark Hamstrawrote: > I wouldn't say that Executors are dumb, but there are some pretty clear > divisions of concepts and responsibilities across the different pieces of > the Spark architecture. A Job is a concept that is completely unknown to an > Executor, which deals instead with just the Tasks that it is given. So you > are correct, Jacek, that any notification of a Job end has to come from the > Driver. > > On Sat, Jan 21, 2017 at 2:10 AM, Jacek Laskowski wrote: > >> Executors are "dumb", i.e. they execute TaskRunners for tasks >> and...that's it. >> >> Your logic should be on the driver that can intercept events >> and...trigger cleanup. >> >> I don't think there's another way to do it. >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Fri, Jan 20, 2017 at 10:47 PM, Keith Chapman >> wrote: >> > Hi Jacek, >> > >> > I've looked at SparkListener and tried it, I see it getting fired on the >> > master but I don't see it getting fired on the workers in a cluster. >> > >> > Regards, >> > Keith. >> > >> > http://keith-chapman.com >> > >> > On Fri, Jan 20, 2017 at 11:09 AM, Jacek Laskowski >> wrote: >> >> >> >> Hi, >> >> >> >> (redirecting to users as it has nothing to do with Spark project >> >> development) >> >> >> >> Monitor jobs and stages using SparkListener and submit cleanup jobs >> where >> >> a condition holds. >> >> >> >> Jacek >> >> >> >> On 20 Jan 2017 3:57 a.m., "Keith Chapman" >> wrote: >> >>> >> >>> Hi , >> >>> >> >>> Is it possible for an executor (or slave) to know when an actual job >> >>> ends? I'm running spark on a cluster (with yarn) and my workers >> create some >> >>> temporary files that I would like to clean up once the job ends. Is >> there a >> >>> way for the worker to detect that a job has finished? I tried doing >> it in >> >>> the JobProgressListener but it does not seem to work in a cluster. >> The event >> >>> is not triggered in the worker. >> >>> >> >>> Regards, >> >>> Keith. >> >>> >> >>> http://keith-chapman.com >> > >> > >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >
Fwd: hc.sql("hive udf query") not able to convert Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.hadoop.io.LongW
Hi Team, I am trying to keep below code in get method and calling that get mthod in another hive UDF and running the hive UDF using Hive Context.sql procedure.. switch (f) { case "double" : return ((DoubleWritable)obj).get(); case "bigint" : return ((LongWritable)obj).get(); case "string" : return ((Text)obj).toString(); default : return obj; } } Suprisingly only LongWritable and Text convrsions are throwing error but DoubleWritable is working So I tried changing below code to switch (f) { case "double" : return ((DoubleWritable)obj).get(); case "bigint" : return ((DoubleWritable)obj).get(); case "string" : return ((Text)obj).toString(); default : return obj; } } Still its throws error saying Java.Lang.Long cant be convrted to org.apache.hadoop.hive.serde2.io.DoubleWritable its working fine on hive but throwing error on spark-sql I am importing the below packages. import java.util.*; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.hive.serde2.io.DoubleWritable; .Please let me know why it is making issue in spark when perfectly running fine on hive
hc.sql("hive udf query") not able to convert Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.hadoop.io.LongWritab
Hi Team, I am trying to keep below code in get method and calling that get mthod in another hive UDF and running the hive UDF using Hive Context.sql procedure.. switch (f) { case "double" : return ((DoubleWritable)obj).get(); case "bigint" : return ((LongWritable)obj).get(); case "string" : return ((Text)obj).toString(); default : return obj; } } Suprisingly only LongWritable and Text convrsions are throwing error but DoubleWritable is working So I tried changing below code to switch (f) { case "double" : return ((DoubleWritable)obj).get(); case "bigint" : return ((DoubleWritable)obj).get(); case "string" : return ((Text)obj).toString(); default : return obj; } } Still its throws error saying Java.Lang.Long cant be convrted to org.apache.hadoop.hive.serde2.io.DoubleWritable its working fine on hive but throwing error on spark-sql I am importing the below packages. import java.util.*; import org.apache.hadoop.hive.serde2.objectinspector.*; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.hive.serde2.io.DoubleWritable; .Please let me know why it is making issue in spark when perfectly running fine on hive
Re: Using mapWithState without a checkpoint
Even if you don't need the checkpointing data for recovery, "mapWithState" still needs to use "checkpoint" to cut off the RDD lineage. On Mon, Jan 23, 2017 at 12:30 AM, shyla deshpandewrote: > Hello spark users, > > I do have the same question as Daniel. > > I would like to save the state in Cassandra and on failure recover using > the initialState. If some one has already tried this, please share your > experience and sample code. > > Thanks. > > On Thu, Nov 17, 2016 at 9:45 AM, Daniel Haviv < > daniel.ha...@veracity-group.com> wrote: > >> Hi, >> Is it possible to use mapWithState without checkpointing at all ? >> I'd rather have the whole application fail, restart and reload an >> initialState RDD than pay for checkpointing every 10 batches. >> >> Thank you, >> Daniel >> > >
Aggregator mutate b1 in place in merge
looking at the docs for org.apache.spark.sql.expressions.Aggregator it says for reduce method: "For performance, the function may modify `b` and return it instead of constructing new object for b.". it makes no such comment for the merge method. this is surprising to me because i know that for PairRDDFunctions.aggregateByKey mutation is allowed in both seqOp and combOp (which are the equivalents of reduce and merge in Aggregator). is it safe to mutate b1 and return it in Aggregator.merge?
Re: ScalaReflectionException (class not found) error for user class in spark 2.1.0
i get the same error using latest spark master branch On Tue, Jan 17, 2017 at 6:24 PM, Koert Kuiperswrote: > and to be clear, this is not in the REPL or with Hive (both well known > situations in which these errors arise) > > On Mon, Jan 16, 2017 at 11:51 PM, Koert Kuipers wrote: > >> i am experiencing a ScalaReflectionException exception when doing an >> aggregation on a spark-sql DataFrame. the error looks like this: >> >> Exception in thread "main" scala.ScalaReflectionException: class >> in JavaMirror with sun.misc.Launcher$AppClassLoader@28d93b30 >> of type class sun.misc.Launcher$AppClassLoader with classpath >> [] not found. >> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors >> .scala:123) >> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors >> .scala:22) >> at >> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute( >> TypeTags.scala:232) >> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) >> at org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply( >> SQLImplicits.scala:127) >> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute( >> TypeTags.scala:232) >> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232) >> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap >> ply(ExpressionEncoder.scala:49) >> at org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLIm >> plicits.scala:127) >> at >> >> >> some things to note: >> * contains driver-class-path as indicated by me using >> spark-submit, and all the jars that spark added. but it does not contain my >> own assembly jar which contains >> * the class that is missing is a simple case class that is only used in >> the aggregators on the executors, never driver-side >> * i am running spark 2.1.0 with java 8 on yarn, but i can reproduce the >> same error in local mode >> >> what is this classloader that excludes my jar? >> the error looks somewhat like SPARK-8470, but i am not using hive, and >> spark was not build with hive support. >> >> i can fix the error by adding my assembly jar to driver-classpath, but >> that feels like a hack. >> >> thanks, >> koert >> >> >