Re: physical memory usage keep increasing for spark app on Yarn

2017-01-23 Thread Pavel Plotnikov
Hi Yang!

I don't know exactly why this happen, but i think GC can't work to fast
enough or size of data with additional objects created while computations
to big for executor.
And i found that this problem only if you make some data manipulations. You
can cache you data first, after that, write in one partiton.
For example

val dropDF = df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")

dropDF.cache()

or

dropDF.write.mode(SaveMode.ErrorIfExists).parquet(temppath)

val dropDF = spark.read.parquet(temppath)

and then

dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)

Best,

On Sun, Jan 22, 2017 at 12:31 PM Yang Cao  wrote:

> Also, do you know why this happen?
>
> On 2017年1月20日, at 18:23, Pavel Plotnikov 
> wrote:
>
> Hi Yang,
> i have faced with the same problem on Mesos and to circumvent this issue i
> am usually increase partition number. On last step in your code you reduce
> number of partitions to 1, try to set bigger value, may be it solve this
> problem.
>
> Cheers,
> Pavel
>
> On Fri, Jan 20, 2017 at 12:35 PM Yang Cao  wrote:
>
> Hi all,
>
> I am running a spark application on YARN-client mode with 6 executors
> (each 4 cores and executor memory = 6G and Overhead = 4G, spark version:
> 1.6.3 / 2.1.0). I find that my executor memory keeps increasing until get
> killed by node manager; and give out the info that tells me to boost 
> spark.yarn.excutor.memoryOverhead.
> I know that this param mainly control the size of memory allocated
> off-heap. But I don’t know when and how the spark engine will use this part
> of memory. Also increase that part of memory not always solve my
> problem. sometimes works sometimes not. It trends to be useless when the
> input data is large.
>
> FYI, my app’s logic is quite simple. It means to combine the small files
> generated in one single day (one directory one day) into a single one and
> write back to hdfs. Here is the core code:
>
> val df = spark.read.parquet(originpath).filter(s"m = ${ts.month} AND d = 
> ${ts.day}").coalesce(400)
>
> val dropDF = 
> df.drop("hh").drop("mm").drop("mode").drop("y").drop("m").drop("d")
>
> dropDF.repartition(1).write.mode(SaveMode.ErrorIfExists).parquet(targetpath)
>
> The source file may have hundreds to thousands level’s partition. And the
> total parquet file is around 1to 5 gigs. Also I find that in the step that
> shuffle reading data from different machines, The size of shuffle read is
> about 4 times larger than the input size, Which is wired or some principle
> I don’t know.
>
> Anyway, I have done some search myself for this problem. Some article said
> that it’s on the direct buffer memory (I don’t set myself). Some article
> said that people solve it with more frequent full GC. Also I find one
> people on SO with very similar situation:
> http://stackoverflow.com/questions/31646679/ever-increasing-physical-memory-for-a-spark-application-in-yarn
> This guy claimed that it’s a bug with parquet but comment questioned him.
> People in this mail list may also receive an email hours ago from
> blondowski who described this problem while writing json:
> http://apache-spark-user-list.1001560.n3.nabble.com/Executors-running-out-of-memory-tt28325.html#none
>
> So it looks like to be common question for different output format. I hope
> someone with experience about this problem could make an explanation about
> this issue. Why this happen and what is a reliable way to solve this
> problem.
>
> Best,
>
>
>


Re: Using mapWithState without a checkpoint

2017-01-23 Thread shyla deshpande
Hello spark users,

I do have the same question as Daniel.

I would like to save the state in Cassandra and on failure recover using
the initialState. If some one has already tried this, please share your
experience and sample code.

Thanks.

On Thu, Nov 17, 2016 at 9:45 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> Is it possible to use mapWithState without checkpointing at all ?
> I'd rather have the whole application fail, restart and reload an
> initialState RDD than pay for checkpointing every 10 batches.
>
> Thank you,
> Daniel
>


How to write Spark DataFrame to salesforce table

2017-01-23 Thread Niraj Kumar
Hi

I am trying to write a dataframe into salesforce table.

facWriteBack.write.mode("append").jdbc(s"jdbc:salesforce:UseSandbox=True;User=;Password=;Security
 Token=;Timeout=0", "SFORCE.")

Am I doing it correctly?
And also suggest, how to upsert a record?


Thanks in advance
Niraj

Disclaimer : 
This email communication may contain privileged and confidential information 
and is intended for the use of the addressee only. If you are not an intended 
recipient you are requested not to reproduce, copy disseminate or in any manner 
distribute this email communication as the same is strictly prohibited. If you 
have received this email in error, please notify the sender immediately by 
return e-mail and delete the communication sent in error. Email communications 
cannot be guaranteed to be secure & error free and Incedo Inc. is not liable 
for any errors in the email communication or for the proper, timely and 
complete transmission thereof.


Spark Streaming proactive monitoring

2017-01-23 Thread Lan Jiang
Hi, there

From the Spark UI, we can monitor the following two metrics:

• Processing Time - The time to process each batch of data.
• Scheduling Delay - the time a batch waits in a queue for the 
processing of previous batches to finish.

However, what is the best way to monitor them proactively? For example, if 
processing time/scheduling delay exceed certain threshold, send alert to the 
admin/developer? 

Lan
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Setting startingOffsets to earliest in structured streaming never catches up

2017-01-23 Thread Michael Armbrust
+1 to Ryan's suggestion of setting maxOffsetsPerTrigger.  This way you can
at least see how quickly it is making progress towards catching up.

On Sun, Jan 22, 2017 at 7:02 PM, Timothy Chan  wrote:

> I'm using version 2.02.
>
> The difference I see between using latest and earliest is a series of jobs
> that take less than a second vs. one job that goes on for over 24 hours.
>
> On Sun, Jan 22, 2017 at 6:54 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Which Spark version are you using? If you are using 2.1.0, could you use
>> the monitoring APIs (http://spark.apache.org/docs/
>> latest/structured-streaming-programming-guide.html#
>> monitoring-streaming-queries) to check the input rate and the processing
>> rate? One possible issue is that the Kafka source launched a pretty large
>> batch and it took too long to finish it. If so, you can use
>> "maxOffsetsPerTrigger" option to limit the data size in a batch in order to
>> observe the progress.
>>
>> On Sun, Jan 22, 2017 at 10:22 AM, Timothy Chan 
>> wrote:
>>
>> I'm running my structured streaming jobs in EMR. We were thinking a worst
>> case scenario recovery situation would be to spin up another cluster and
>> set startingOffsets to earliest (our Kafka cluster has a retention policy
>> of 7 days).
>>
>> My observation is that the job never catches up to latest. This is not
>> acceptable. I've set the number of partitions for the topic to 6. I've
>> tried using a cluster of 4 in EMR.
>>
>> The producer rate for this topic is 4 events/second. Does anyone have any
>> suggestions on what I can do to have my consumer catch up to latest faster?
>>
>>
>>


Re: why does spark web UI keeps changing its port?

2017-01-23 Thread Marcelo Vanzin
No. Each app has its own UI which runs (starting on) port 4040.

On Mon, Jan 23, 2017 at 12:05 PM, kant kodali  wrote:
> I am using standalone mode so wouldn't be 8080 for my app web ui as well?
> There is nothing running on 4040 in my cluster.
>
> http://spark.apache.org/docs/latest/security.html#standalone-mode-only
>
> On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin 
> wrote:
>>
>> That's the Master, whose default port is 8080 (not 4040). The default
>> port for the app's UI is 4040.
>>
>> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali  wrote:
>> > I am not sure why Spark web UI keeps changing its port every time I
>> > restart
>> > a cluster? how can I make it run always on one port? I did make sure
>> > there
>> > is no process running on 4040(spark default web ui port) however it
>> > still
>> > starts at 8080. any ideas?
>> >
>> >
>> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at
>> > http://x.x.x.x:8080
>> >
>> >
>> > Thanks!
>>
>>
>>
>> --
>> Marcelo
>
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: why does spark web UI keeps changing its port?

2017-01-23 Thread kant kodali
I am using standalone mode so wouldn't be 8080 for my app web ui as well?
There is nothing running on 4040 in my cluster.

http://spark.apache.org/docs/latest/security.html#standalone-mode-only

On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin 
wrote:

> That's the Master, whose default port is 8080 (not 4040). The default
> port for the app's UI is 4040.
>
> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali  wrote:
> > I am not sure why Spark web UI keeps changing its port every time I
> restart
> > a cluster? how can I make it run always on one port? I did make sure
> there
> > is no process running on 4040(spark default web ui port) however it still
> > starts at 8080. any ideas?
> >
> >
> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at
> > http://x.x.x.x:8080
> >
> >
> > Thanks!
>
>
>
> --
> Marcelo
>


Re: why does spark web UI keeps changing its port?

2017-01-23 Thread kant kodali
hmm..I guess in that case my assumption of "app" is wrong. I thought the
app is a client jar that you submit. no? If so, say I submit multiple jobs
then I get two UI'S?

On Mon, Jan 23, 2017 at 12:07 PM, Marcelo Vanzin 
wrote:

> No. Each app has its own UI which runs (starting on) port 4040.
>
> On Mon, Jan 23, 2017 at 12:05 PM, kant kodali  wrote:
> > I am using standalone mode so wouldn't be 8080 for my app web ui as well?
> > There is nothing running on 4040 in my cluster.
> >
> > http://spark.apache.org/docs/latest/security.html#standalone-mode-only
> >
> > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin 
> > wrote:
> >>
> >> That's the Master, whose default port is 8080 (not 4040). The default
> >> port for the app's UI is 4040.
> >>
> >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali 
> wrote:
> >> > I am not sure why Spark web UI keeps changing its port every time I
> >> > restart
> >> > a cluster? how can I make it run always on one port? I did make sure
> >> > there
> >> > is no process running on 4040(spark default web ui port) however it
> >> > still
> >> > starts at 8080. any ideas?
> >> >
> >> >
> >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at
> >> > http://x.x.x.x:8080
> >> >
> >> >
> >> > Thanks!
> >>
> >>
> >>
> >> --
> >> Marcelo
> >
> >
>
>
>
> --
> Marcelo
>


Re: why does spark web UI keeps changing its port?

2017-01-23 Thread Marcelo Vanzin
As I said.

Each app gets their own UI. Look at the logs printed to the output.
The port will depend on whether they're running on the same host at
the same time.

This is irrespective of how they are run.

On Mon, Jan 23, 2017 at 12:40 PM, kant kodali  wrote:
> yes I meant submitting through spark-submit.
>
> so If I do spark-submit A.jar and spark-submit A.jar again. Do I get two
> UI's or one UI'? and which ports do they run on when using the stand alone
> mode?
>
> On Mon, Jan 23, 2017 at 12:19 PM, Marcelo Vanzin 
> wrote:
>>
>> Depends on what you mean by "job". Which is why I prefer "app", which
>> is clearer (something you submit using "spark-submit", for example).
>>
>> But really, I'm not sure what you're asking now.
>>
>> On Mon, Jan 23, 2017 at 12:15 PM, kant kodali  wrote:
>> > hmm..I guess in that case my assumption of "app" is wrong. I thought the
>> > app
>> > is a client jar that you submit. no? If so, say I submit multiple jobs
>> > then
>> > I get two UI'S?
>> >
>> > On Mon, Jan 23, 2017 at 12:07 PM, Marcelo Vanzin 
>> > wrote:
>> >>
>> >> No. Each app has its own UI which runs (starting on) port 4040.
>> >>
>> >> On Mon, Jan 23, 2017 at 12:05 PM, kant kodali 
>> >> wrote:
>> >> > I am using standalone mode so wouldn't be 8080 for my app web ui as
>> >> > well?
>> >> > There is nothing running on 4040 in my cluster.
>> >> >
>> >> >
>> >> > http://spark.apache.org/docs/latest/security.html#standalone-mode-only
>> >> >
>> >> > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin
>> >> > 
>> >> > wrote:
>> >> >>
>> >> >> That's the Master, whose default port is 8080 (not 4040). The
>> >> >> default
>> >> >> port for the app's UI is 4040.
>> >> >>
>> >> >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali 
>> >> >> wrote:
>> >> >> > I am not sure why Spark web UI keeps changing its port every time
>> >> >> > I
>> >> >> > restart
>> >> >> > a cluster? how can I make it run always on one port? I did make
>> >> >> > sure
>> >> >> > there
>> >> >> > is no process running on 4040(spark default web ui port) however
>> >> >> > it
>> >> >> > still
>> >> >> > starts at 8080. any ideas?
>> >> >> >
>> >> >> >
>> >> >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at
>> >> >> > http://x.x.x.x:8080
>> >> >> >
>> >> >> >
>> >> >> > Thanks!
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Marcelo
>> >> >
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Marcelo
>> >
>> >
>>
>>
>>
>> --
>> Marcelo
>
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: why does spark web UI keeps changing its port?

2017-01-23 Thread Marcelo Vanzin
Depends on what you mean by "job". Which is why I prefer "app", which
is clearer (something you submit using "spark-submit", for example).

But really, I'm not sure what you're asking now.

On Mon, Jan 23, 2017 at 12:15 PM, kant kodali  wrote:
> hmm..I guess in that case my assumption of "app" is wrong. I thought the app
> is a client jar that you submit. no? If so, say I submit multiple jobs then
> I get two UI'S?
>
> On Mon, Jan 23, 2017 at 12:07 PM, Marcelo Vanzin 
> wrote:
>>
>> No. Each app has its own UI which runs (starting on) port 4040.
>>
>> On Mon, Jan 23, 2017 at 12:05 PM, kant kodali  wrote:
>> > I am using standalone mode so wouldn't be 8080 for my app web ui as
>> > well?
>> > There is nothing running on 4040 in my cluster.
>> >
>> > http://spark.apache.org/docs/latest/security.html#standalone-mode-only
>> >
>> > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin 
>> > wrote:
>> >>
>> >> That's the Master, whose default port is 8080 (not 4040). The default
>> >> port for the app's UI is 4040.
>> >>
>> >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali 
>> >> wrote:
>> >> > I am not sure why Spark web UI keeps changing its port every time I
>> >> > restart
>> >> > a cluster? how can I make it run always on one port? I did make sure
>> >> > there
>> >> > is no process running on 4040(spark default web ui port) however it
>> >> > still
>> >> > starts at 8080. any ideas?
>> >> >
>> >> >
>> >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at
>> >> > http://x.x.x.x:8080
>> >> >
>> >> >
>> >> > Thanks!
>> >>
>> >>
>> >>
>> >> --
>> >> Marcelo
>> >
>> >
>>
>>
>>
>> --
>> Marcelo
>
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: why does spark web UI keeps changing its port?

2017-01-23 Thread Marcelo Vanzin
That's the Master, whose default port is 8080 (not 4040). The default
port for the app's UI is 4040.

On Mon, Jan 23, 2017 at 11:47 AM, kant kodali  wrote:
> I am not sure why Spark web UI keeps changing its port every time I restart
> a cluster? how can I make it run always on one port? I did make sure there
> is no process running on 4040(spark default web ui port) however it still
> starts at 8080. any ideas?
>
>
> MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at
> http://x.x.x.x:8080
>
>
> Thanks!



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



why does spark web UI keeps changing its port?

2017-01-23 Thread kant kodali
I am not sure why Spark web UI keeps changing its port every time I restart
a cluster? how can I make it run always on one port? I did make sure there
is no process running on 4040(spark default web ui port) however it still
starts at 8080. any ideas?


MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at
http://x.x.x.x:8080


Thanks!


Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread hakanilter
Hi everyone,

I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
multiple kafka topics. After starting the job, everything works fine first
(like 700 req/sec) but after a while (couples of days or a week) it starts
processing only some part of the data (like 350 req/sec). When I check the
kafka topics, I can see that there are still 700 req/sec coming to the
topics. I don't see any errors, exceptions or any other problem. The job
works fine when I start the same code with just single kafka topic. 

Do you have any idea or a clue to understand the problem? 

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: converting timestamp column to a java.util.Date

2017-01-23 Thread Takeshi Yamamuro
Hi,

I think Spark UDF can only handle `java.sql.Date`.
So, you need to change the return type in you UDF.

// maropu

On Tue, Jan 24, 2017 at 8:18 AM, Marco Mistroni  wrote:

> HI all
>   i am trying to convert a  string column, in a Dataframe , to a
> java.util.Date but i am getting this exception
>
> [dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo
> - Removed broadcast_0_piece0 on 169.254.2.140:53468 in memory (size: 14.3
> KB, free: 767.4 MB)
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Schema for type java.util.Date is not supported
>
> here's my code
>
> val tstampToDateFunc:(java.sql.Timestamp => java.util.Date) = ts => new
> java.util.Date(ts.getTime)
> val tsampConversionFunc = udf(tstampToDateFunc)
>
> sharesDf.withColumn("price", col("_c2").cast("double"))
> .withColumn("creationTime",
> tsampConversionFunc(col("_c1")))
>
> Are there any workarounds?
> i m trying to import data into mongoDB via Spark. The source is a csv file
> where
> i have  1 timestamp column and a bunch of strings. i will need to
> convert that
> to something compatible with a mongo's ISODate
>
> kr
>  marco
>
>



-- 
---
Takeshi Yamamuro


converting timestamp column to a java.util.Date

2017-01-23 Thread Marco Mistroni
HI all
  i am trying to convert a  string column, in a Dataframe , to a
java.util.Date but i am getting this exception

[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo -
Removed broadcast_0_piece0 on 169.254.2.140:53468 in memory (size: 14.3 KB,
free: 767.4 MB)
Exception in thread "main" java.lang.UnsupportedOperationException: Schema
for type java.util.Date is not supported

here's my code

val tstampToDateFunc:(java.sql.Timestamp => java.util.Date) = ts => new
java.util.Date(ts.getTime)
val tsampConversionFunc = udf(tstampToDateFunc)

sharesDf.withColumn("price", col("_c2").cast("double"))
.withColumn("creationTime",
tsampConversionFunc(col("_c1")))

Are there any workarounds?
i m trying to import data into mongoDB via Spark. The source is a csv file
where
i have  1 timestamp column and a bunch of strings. i will need to
convert that
to something compatible with a mongo's ISODate

kr
 marco


Re: Do jobs fail because of other users of a cluster?

2017-01-23 Thread Matthew Dailey
In general, Java processes fail with an OutOfMemoryError when your code and
data does not fit into the memory allocated to the runtime.  In Spark, that
memory is controlled through the --executor-memory flag.
If you are running Spark on YARN, then YARN configuration will dictate the
maximum memory that your Spark executors can request.  Here is a pretty
good article about setting memory in Spark on YARN:
http://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html

If the OS were to kill your process because the system has run out of
memory, you would see an error printed to standard error that looks like
this:

Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0xe232, 37601280, 0) failed;
error='Cannot allocate memory' (errno=12)
# There is insufficient memory for the Java Runtime Environment to continue.



On Wed, Jan 18, 2017 at 10:25 AM, David Frese 
wrote:

> Hello everybody,
>
> being quite new to Spark, I am struggling a lot with OutOfMemory exceptions
> and "GC overhead limit reached" failures of my jobs, submitted from a
> spark-shell and "master yarn".
>
> Playing with --num-executors, --executor-memory and --executor-cores I
> occasionally get something done. But I'm also not the only one using the
> cluster, and it seems to me, that my jobs sometimes fail with the above
> errors, because other people have something running, or have a spark-shell
> open at that time; or at least it seems that with the same code, data and
> settings, the job sometimes completes and sometimes fails.
>
> Is that "expected behaviour"?
>
> What options/tools can be used to make the success/failure of a job
> deterministic - there a lot things out there like, 'dynamic allocation',
> Hadoop 'fair scheduler'; but very hard for a newbee to evaluate that (resp.
> make suggestions to the admins).
>
> If it cannot be made deterministic, how can I reliably distinguish the OOM
> failures that are caused by incorrect settings on my side (e.g. because my
> data does not fit into memory), and those failures that are caused by
> resource consumption/blocking from other jobs?
>
> Thanks for sharing your thoughts and experiences!
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Do-jobs-fail-because-of-other-users-
> of-a-cluster-tp28318.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: tuning the spark.locality.wait

2017-01-23 Thread Matthew Dailey
This article recommends setting spark.locality.wait to 10 (milliseconds) in
the case of using Spark Streaming and gives an explanation of why they
chose that value.  If using batch Spark, that value should still be a good
starting place
https://www.inovex.de/blog/247-spark-streaming-on-yarn-in-production/



On Sat, Jan 21, 2017 at 5:06 AM, Cesar  wrote:

>
> I am working with datasets of the order of 200 GB using 286 cores divided
> across 143 executor. Each executor has 32 Gb (which makes every core 15
> Gb). And I am using Spark 1.6.
>
>
> I would like to tune the spark.locality.wait. Does anyone can give me a
> range on the values of spark.locality wait that I can try ?
>
>
>
> Thanks a lot !
> --
> Cesar Flores
>


How to make the state in a streaming application idempotent?

2017-01-23 Thread shyla deshpande
In a Wordcount application which  stores the count of all the words input
so far using mapWithState.  How do I make sure my counts are not messed up
if I happen to read a line more than once?

Appreciate your response.

Thanks


what would be the recommended production requirements?

2017-01-23 Thread kant kodali
Hi,

I am planning to go production using spark standalone mode using the
following configuration and I would like to know if I am missing something
or any other suggestions are welcome.

1) Three Spark Standalone Master deployed on different nodes and using
Apache Zookeeper for Leader Election.
2) Two or Three worker nodes (For our workloads which is being able to
process 5000 messages/sec two worker nodes are more than sufficient when we
ran our tests but for the safe side we may use three )
3) will use HDFS for storing recoverable state, WAL, checkpoint etc since
we are running a streaming application.
4) some sort of monitoring and alerting framework

Do I need anything else apart from this? What's not clear to me is how
service discovery is done. For examples, Right now we manually have to edit
the ip addresses of worker machines in SPARK_HOME/conf/slaves so we have to
bring the entire cluster down. so what is most common way to solve this
given that we dont plan on using mesos or yarn? I know of some tools which
can help me here but I would like to know which of those tools are widely
used? Any other suggestions in case I am missing are welcome.

Thanks,
kant


Re: why does spark web UI keeps changing its port?

2017-01-23 Thread kant kodali
yes I meant submitting through spark-submit.

so If I do spark-submit A.jar and spark-submit A.jar again. Do I get two
UI's or one UI'? and which ports do they run on when using the stand alone
mode?

On Mon, Jan 23, 2017 at 12:19 PM, Marcelo Vanzin 
wrote:

> Depends on what you mean by "job". Which is why I prefer "app", which
> is clearer (something you submit using "spark-submit", for example).
>
> But really, I'm not sure what you're asking now.
>
> On Mon, Jan 23, 2017 at 12:15 PM, kant kodali  wrote:
> > hmm..I guess in that case my assumption of "app" is wrong. I thought the
> app
> > is a client jar that you submit. no? If so, say I submit multiple jobs
> then
> > I get two UI'S?
> >
> > On Mon, Jan 23, 2017 at 12:07 PM, Marcelo Vanzin 
> > wrote:
> >>
> >> No. Each app has its own UI which runs (starting on) port 4040.
> >>
> >> On Mon, Jan 23, 2017 at 12:05 PM, kant kodali 
> wrote:
> >> > I am using standalone mode so wouldn't be 8080 for my app web ui as
> >> > well?
> >> > There is nothing running on 4040 in my cluster.
> >> >
> >> > http://spark.apache.org/docs/latest/security.html#
> standalone-mode-only
> >> >
> >> > On Mon, Jan 23, 2017 at 11:51 AM, Marcelo Vanzin  >
> >> > wrote:
> >> >>
> >> >> That's the Master, whose default port is 8080 (not 4040). The default
> >> >> port for the app's UI is 4040.
> >> >>
> >> >> On Mon, Jan 23, 2017 at 11:47 AM, kant kodali 
> >> >> wrote:
> >> >> > I am not sure why Spark web UI keeps changing its port every time I
> >> >> > restart
> >> >> > a cluster? how can I make it run always on one port? I did make
> sure
> >> >> > there
> >> >> > is no process running on 4040(spark default web ui port) however it
> >> >> > still
> >> >> > starts at 8080. any ideas?
> >> >> >
> >> >> >
> >> >> > MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at
> >> >> > http://x.x.x.x:8080
> >> >> >
> >> >> >
> >> >> > Thanks!
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Marcelo
> >> >
> >> >
> >>
> >>
> >>
> >> --
> >> Marcelo
> >
> >
>
>
>
> --
> Marcelo
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread Cody Koeninger
Are you using receiver-based or direct stream?

Are you doing 1 stream per topic, or 1 stream for all topics?

If you're using the direct stream, the actual topics and offset ranges
should be visible in the logs, so you should be able to see more
detail about what's happening (e.g. all topics are still being
processed but offsets are significantly behind, vs only certain topics
being processed but keeping up with latest offsets)

On Mon, Jan 23, 2017 at 3:14 PM, hakanilter  wrote:
> Hi everyone,
>
> I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> multiple kafka topics. After starting the job, everything works fine first
> (like 700 req/sec) but after a while (couples of days or a week) it starts
> processing only some part of the data (like 350 req/sec). When I check the
> kafka topics, I can see that there are still 700 req/sec coming to the
> topics. I don't see any errors, exceptions or any other problem. The job
> works fine when I start the same code with just single kafka topic.
>
> Do you have any idea or a clue to understand the problem?
>
> Thanks.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Do jobs fail because of other users of a cluster?

2017-01-23 Thread Sirisha Cheruvu
ok

On Tue, Jan 24, 2017 at 7:13 AM, Matthew Dailey 
wrote:

> In general, Java processes fail with an OutOfMemoryError when your code
> and data does not fit into the memory allocated to the runtime.  In Spark,
> that memory is controlled through the --executor-memory flag.
> If you are running Spark on YARN, then YARN configuration will dictate the
> maximum memory that your Spark executors can request.  Here is a pretty
> good article about setting memory in Spark on YARN:
> http://www.cloudera.com/documentation/enterprise/5-5-
> x/topics/cdh_ig_running_spark_on_yarn.html
>
> If the OS were to kill your process because the system has run out of
> memory, you would see an error printed to standard error that looks like
> this:
>
> Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
> os::commit_memory(0xe232, 37601280, 0) failed; error='Cannot 
> allocate memory' (errno=12)
> # There is insufficient memory for the Java Runtime Environment to continue.
>
>
>
> On Wed, Jan 18, 2017 at 10:25 AM, David Frese  > wrote:
>
>> Hello everybody,
>>
>> being quite new to Spark, I am struggling a lot with OutOfMemory
>> exceptions
>> and "GC overhead limit reached" failures of my jobs, submitted from a
>> spark-shell and "master yarn".
>>
>> Playing with --num-executors, --executor-memory and --executor-cores I
>> occasionally get something done. But I'm also not the only one using the
>> cluster, and it seems to me, that my jobs sometimes fail with the above
>> errors, because other people have something running, or have a spark-shell
>> open at that time; or at least it seems that with the same code, data and
>> settings, the job sometimes completes and sometimes fails.
>>
>> Is that "expected behaviour"?
>>
>> What options/tools can be used to make the success/failure of a job
>> deterministic - there a lot things out there like, 'dynamic allocation',
>> Hadoop 'fair scheduler'; but very hard for a newbee to evaluate that
>> (resp.
>> make suggestions to the admins).
>>
>> If it cannot be made deterministic, how can I reliably distinguish the OOM
>> failures that are caused by incorrect settings on my side (e.g. because my
>> data does not fit into memory), and those failures that are caused by
>> resource consumption/blocking from other jobs?
>>
>> Thanks for sharing your thoughts and experiences!
>>
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Do-jobs-fail-because-of-other-users-of
>> -a-cluster-tp28318.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread Hakan İlter
I'm using DirectStream as one stream for all topics. I check the offset
ranges from Kafka Manager and don't see any significant deltas.

On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger  wrote:

> Are you using receiver-based or direct stream?
>
> Are you doing 1 stream per topic, or 1 stream for all topics?
>
> If you're using the direct stream, the actual topics and offset ranges
> should be visible in the logs, so you should be able to see more
> detail about what's happening (e.g. all topics are still being
> processed but offsets are significantly behind, vs only certain topics
> being processed but keeping up with latest offsets)
>
> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter  wrote:
> > Hi everyone,
> >
> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> > multiple kafka topics. After starting the job, everything works fine
> first
> > (like 700 req/sec) but after a while (couples of days or a week) it
> starts
> > processing only some part of the data (like 350 req/sec). When I check
> the
> > kafka topics, I can see that there are still 700 req/sec coming to the
> > topics. I don't see any errors, exceptions or any other problem. The job
> > works fine when I start the same code with just single kafka topic.
> >
> > Do you have any idea or a clue to understand the problem?
> >
> > Thanks.
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-streaming-multiple-kafka-
> topic-doesn-t-work-at-least-once-tp28334.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re:

2017-01-23 Thread Naveen
Hi Keith,

Can you try including a clean-up step at the end of job, before driver is
out of SparkContext, to clean the necessary files through some regex
patterns or so, on all nodes in your cluster by default. If files are not
available on few nodes, that should not be a problem, isnnt?



On Sun, Jan 22, 2017 at 1:26 AM, Mark Hamstra 
wrote:

> I wouldn't say that Executors are dumb, but there are some pretty clear
> divisions of concepts and responsibilities across the different pieces of
> the Spark architecture. A Job is a concept that is completely unknown to an
> Executor, which deals instead with just the Tasks that it is given.  So you
> are correct, Jacek, that any notification of a Job end has to come from the
> Driver.
>
> On Sat, Jan 21, 2017 at 2:10 AM, Jacek Laskowski  wrote:
>
>> Executors are "dumb", i.e. they execute TaskRunners for tasks
>> and...that's it.
>>
>> Your logic should be on the driver that can intercept events
>> and...trigger cleanup.
>>
>> I don't think there's another way to do it.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Fri, Jan 20, 2017 at 10:47 PM, Keith Chapman 
>> wrote:
>> > Hi Jacek,
>> >
>> > I've looked at SparkListener and tried it, I see it getting fired on the
>> > master but I don't see it getting fired on the workers in a cluster.
>> >
>> > Regards,
>> > Keith.
>> >
>> > http://keith-chapman.com
>> >
>> > On Fri, Jan 20, 2017 at 11:09 AM, Jacek Laskowski 
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> (redirecting to users as it has nothing to do with Spark project
>> >> development)
>> >>
>> >> Monitor jobs and stages using SparkListener and submit cleanup jobs
>> where
>> >> a condition holds.
>> >>
>> >> Jacek
>> >>
>> >> On 20 Jan 2017 3:57 a.m., "Keith Chapman" 
>> wrote:
>> >>>
>> >>> Hi ,
>> >>>
>> >>> Is it possible for an executor (or slave) to know when an actual job
>> >>> ends? I'm running spark on a cluster (with yarn) and my workers
>> create some
>> >>> temporary files that I would like to clean up once the job ends. Is
>> there a
>> >>> way for the worker to detect that a job has finished? I tried doing
>> it in
>> >>> the JobProgressListener but it does not seem to work in a cluster.
>> The event
>> >>> is not triggered in the worker.
>> >>>
>> >>> Regards,
>> >>> Keith.
>> >>>
>> >>> http://keith-chapman.com
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Fwd: hc.sql("hive udf query") not able to convert Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.hadoop.io.LongW

2017-01-23 Thread Sirisha Cheruvu
Hi Team,

I am trying to keep below code in get method and calling that get mthod in
another hive UDF
and running the hive UDF using Hive Context.sql procedure..


switch (f) {
   case "double" :  return ((DoubleWritable)obj).get();
case "bigint" :  return ((LongWritable)obj).get();
case "string" :  return ((Text)obj).toString();
   default  :  return obj;
 }
}

Suprisingly only LongWritable and Text convrsions are throwing error but
DoubleWritable is working
So I tried changing below code to

switch (f) {
case "double" :  return ((DoubleWritable)obj).get();
case "bigint" :  return ((DoubleWritable)obj).get();
case "string" :  return ((Text)obj).toString();
default  :  return obj;
  }
}

Still its throws error saying Java.Lang.Long cant be convrted
to org.apache.hadoop.hive.serde2.io.DoubleWritable



its working fine on hive but throwing error on spark-sql

I am importing the below packages.
import java.util.*;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;

.Please let me know why it is making issue in spark when perfectly running
fine on hive


hc.sql("hive udf query") not able to convert Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.hadoop.io.LongWritab

2017-01-23 Thread Sirisha Cheruvu
Hi Team,

I am trying to keep below code in get method and calling that get mthod in
another hive UDF
and running the hive UDF using Hive Context.sql procedure..


switch (f) {
   case "double" :  return ((DoubleWritable)obj).get();
case "bigint" :  return ((LongWritable)obj).get();
case "string" :  return ((Text)obj).toString();
   default  :  return obj;
 }
}

Suprisingly only LongWritable and Text convrsions are throwing error but
DoubleWritable is working
So I tried changing below code to

switch (f) {
case "double" :  return ((DoubleWritable)obj).get();
case "bigint" :  return ((DoubleWritable)obj).get();
case "string" :  return ((Text)obj).toString();
default  :  return obj;
  }
}

Still its throws error saying Java.Lang.Long cant be convrted
to org.apache.hadoop.hive.serde2.io.DoubleWritable



its working fine on hive but throwing error on spark-sql

I am importing the below packages.
import java.util.*;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;

.Please let me know why it is making issue in spark when perfectly running
fine on hive


Re: Using mapWithState without a checkpoint

2017-01-23 Thread Shixiong(Ryan) Zhu
Even if you don't need the checkpointing data for recovery, "mapWithState"
still needs to use "checkpoint" to cut off the RDD lineage.

On Mon, Jan 23, 2017 at 12:30 AM, shyla deshpande 
wrote:

> Hello spark users,
>
> I do have the same question as Daniel.
>
> I would like to save the state in Cassandra and on failure recover using
> the initialState. If some one has already tried this, please share your
> experience and sample code.
>
> Thanks.
>
> On Thu, Nov 17, 2016 at 9:45 AM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
>> Hi,
>> Is it possible to use mapWithState without checkpointing at all ?
>> I'd rather have the whole application fail, restart and reload an
>> initialState RDD than pay for checkpointing every 10 batches.
>>
>> Thank you,
>> Daniel
>>
>
>


Aggregator mutate b1 in place in merge

2017-01-23 Thread Koert Kuipers
looking at the docs for org.apache.spark.sql.expressions.Aggregator it says
for reduce method: "For performance, the function may modify `b` and return
it instead of constructing new object for b.".

it makes no such comment for the merge method.

this is surprising to me because i know that for
PairRDDFunctions.aggregateByKey mutation is allowed in both seqOp and
combOp (which are the equivalents of reduce and merge in Aggregator).

is it safe to mutate b1 and return it in Aggregator.merge?


Re: ScalaReflectionException (class not found) error for user class in spark 2.1.0

2017-01-23 Thread Koert Kuipers
i get the same error using latest spark master branch

On Tue, Jan 17, 2017 at 6:24 PM, Koert Kuipers  wrote:

> and to be clear, this is not in the REPL or with Hive (both well known
> situations in which these errors arise)
>
> On Mon, Jan 16, 2017 at 11:51 PM, Koert Kuipers  wrote:
>
>> i am experiencing a ScalaReflectionException exception when doing an
>> aggregation on a spark-sql DataFrame. the error looks like this:
>>
>> Exception in thread "main" scala.ScalaReflectionException: class
>>  in JavaMirror with sun.misc.Launcher$AppClassLoader@28d93b30
>> of type class sun.misc.Launcher$AppClassLoader with classpath
>> [] not found.
>> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors
>> .scala:123)
>> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors
>> .scala:22)
>> at 
>> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(
>> TypeTags.scala:232)
>> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>> at org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(
>> SQLImplicits.scala:127)
>> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(
>> TypeTags.scala:232)
>> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
>> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.ap
>> ply(ExpressionEncoder.scala:49)
>> at org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLIm
>> plicits.scala:127)
>> at 
>>
>>
>> some things to note:
>> *  contains driver-class-path as indicated by me using
>> spark-submit, and all the jars that spark added. but it does not contain my
>> own assembly jar which contains 
>> * the class that is missing is a simple case class that is only used in
>> the aggregators on the executors, never driver-side
>> * i am running spark 2.1.0 with java 8 on yarn, but i can reproduce the
>> same error in local mode
>>
>> what is this classloader that excludes my jar?
>> the error looks somewhat like SPARK-8470, but i am not using hive, and
>> spark was not build with hive support.
>>
>> i can fix the error by adding my assembly jar to driver-classpath, but
>> that feels like a hack.
>>
>> thanks,
>> koert
>>
>>
>