Streaming job, catch exceptions

2019-05-11 Thread Behroz Sikander
Hello,
I am using Spark 2.2.1 with standalone resource manager.

I have a streaming job where from time to time jobs are aborted due to the
following exception. The reasons are different e.g.
FileNotFound/NullPointerException etc

org.apache.spark.SparkException: Job aborted due to stage failure:
Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3
in stage 0.0.

These exceptions are printed in driver logs.

I have a try/catch around my streaming job. Strangely sometimes, the above
exceptions are printed in the logs but my try/catch block never catches
them but sometimes, it does catch them. In both cases, the job continues to
process data.

I am trying to understand this behavior that in which case I will be able
to catch the exception.

I have tried to reproduce this using something like rdd.map(x=>
1/0).print() but failed. I can see the exception in driver logs but my
catch block never catches it.

Regards,
Behroz


[SparkLauncher] stateChanged event not received in standalone cluster mode

2018-06-06 Thread Behroz Sikander
I have a client application which launches multiple jobs in Spark Cluster
using SparkLauncher. I am using *Standalone* *cluster mode*. Launching jobs
works fine till now. I use launcher.startApplication() to launch.

But now, I have a requirement to check the states of my Driver process. I
added a Listener implementing the SparkAppHandle.Listener but I don't get
any events. I am following the approach mentioned here
https://www.linkedin.com/pulse/spark-launcher-amol-kale

*I tried the same code with client code and I receive all the events as
expected*.

So, I am guessing that something different needs to be done in cluster
mode. Is there any example with cluster mode?

Regards,
Behroz


Properly stop applications or jobs within the application

2018-03-05 Thread Behroz Sikander
Hello,
We are using spark-jobserver to spawn jobs in Spark cluster. We have
recently faced issues with Zombie jobs in Spark cluster. This normally
happens when the job is accessing some external resources like Kafka/C* and
something goes wrong while consuming them. For example, if suddenly a topic
which was being consumed is deleted in Kafka or connection breaks to the
whole Kafka cluster.

Within spark-jobserver, we have the option to delete the context/jobs in
such scenarios.
When we delete the job
,
internally context.cancelJobGroup() is used.
When we delete the context
,
internally context.stop(true,true) is executed.

In both cases, even if we delete the job/context, the application on the
Spark cluster is still running (sometimes) and some jobs are still being
executed within Spark.

Here are the logs of one such scenario. The job context was stopped but it
kept on running and became a zombie.

2018-02-28 15:36:50,931 INFO ForkJoinPool-3-worker-13
org.apache.kafka.common.utils.AppInfoParser []: Kafka version :
0.11.0.1-SNAPSHOT
2018-02-28 15:36:50,931 INFO ForkJoinPool-3-worker-13
org.apache.kafka.common.utils.AppInfoParser []: Kafka commitId :
de8225b66d494cd
2018-02-28 15:36:51,144 INFO dispatcher-event-loop-5
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint
[]: Registered executor NettyRpcEndpointRef(null) (10.10.10.15:46224)
with ID 1
2018-02-28 15:38:58,254 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -3 could
not be established. Broker may not be available.
2018-02-28 15:41:05,485 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -2 could
not be established. Broker may not be available.
2018-02-28 15:42:07,074 WARN JobServer-akka.actor.default-dispatcher-3
akka.cluster.ClusterCoreDaemon []: Cluster Node
[akka.tcp://JobServer@127.0.0.1:43319] - Marking node(s) as
UNREACHABLE [Member(address = akka.tcp://JobServer@127.0.0.1:37343,
status = Up)]. Node roles [manager]


Later at some point, we see the following logs. It seems that from Spark
job, none of the Kafka nodes were accessible. The job kept on trying and
became a zombie.

2018-02-28 15:43:12,717 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -3 could
not be established. Broker may not be available.
2018-02-28 15:45:19,949 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -1 could
not be established. Broker may not be available.
2018-02-28 15:47:27,180 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -2 could
not be established. Broker may not be available.
2018-02-28 15:49:34,412 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -3 could
not be established. Broker may not be available.
2018-02-28 15:51:41,644 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -1 could
not be established. Broker may not be available.
2018-02-28 15:53:48,877 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -2 could
not be established. Broker may not be available.
2018-02-28 15:55:56,109 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -1 could
not be established. Broker may not be available.
2018-02-28 15:58:03,340 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -2 could
not be established. Broker may not be available.
2018-02-28 16:00:10,572 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -3 could
not be established. Broker may not be available.
2018-02-28 16:02:17,804 WARN ForkJoinPool-3-worker-13
org.apache.kafka.clients.NetworkClient []: Connection to node -1 could
not be established. Broker may not be available.




Similarly to this, we have another scenario for zombie contexts. The logs
are in the gist below.
https://gist.github.com/bsikander/697d85e2352a650437a922752328a90f

In the gist, you can see that the topic is not created and the job tried to
use it. Then when we try to delete the job but it bacame a zombie and kept
on showing.
"Block rdd_13011_0 already exists on this machine; not re-adding it"


So, my question would be, what is the right way to kill the jobs running
within
the context or the context/application itself without having these zombies?


Regards,
Behroz


Programmatically get status of job (WAITING/RUNNING)

2017-10-30 Thread Behroz Sikander
Hi,

I have a Spark Cluster running in client mode. I programmatically submit
jobs to spark cluster. Under the hood, I am using spark-submit.

If my cluster is overloaded and I start a context, the driver JVM keeps on
waiting for executors. The executors are in waiting state because cluster
does not have enough resources. Here are the log messages in driver logs

2017-10-27 13:20:15,260 WARN Timer-0
org.apache.spark.scheduler.TaskSchedulerImpl []: Initial job has not
accepted any resources; check your cluster UI to ensure that workers
are registered and have sufficient resources2017-10-27 13:20:30,259
WARN Timer-0 org.apache.spark.scheduler.TaskSchedulerImpl []: Initial
job has not accepted any resources; check your cluster UI to ensure
that workers are registered and have sufficient resources

Is it possible to programmatically check the status of application (e.g.
Running/Waiting etc)? I know that we can use the application id and then
query the history server but I would like to know a solution which does not
involve REST calls to history server.

SparkContext should know about the state? How can i get this information
from sc?


Regards,

Behroz


Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Behroz Sikander
Yea we also didn't find anything related to this online.

Are you aware of any memory leaks in worker in 1.6.2 spark which might be
causing this ?
Do you know of any documentation which explains all the tasks that a worker
is performing ? Maybe we can get some clue from there.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:21 PM, Yong Zhang  wrote:

> I never experienced worker OOM or very rarely see this online. So my guess
> that you have to generate the heap dump file to analyze it.
>
>
> Yong
>
>
> ------
> *From:* Behroz Sikander 
> *Sent:* Friday, March 24, 2017 9:15 AM
> *To:* Yong Zhang
> *Cc:* user@spark.apache.org
> *Subject:* Re: [Worker Crashing] OutOfMemoryError: GC overhead limit
> execeeded
>
> Thank you for the response.
>
> Yes, I am sure because the driver was working fine. Only 2 workers went
> down with OOM.
>
> Regards,
> Behroz
>
> On Fri, Mar 24, 2017 at 2:12 PM, Yong Zhang  wrote:
>
>> I am not 100% sure, but normally "dispatcher-event-loop" OOM means the
>> driver OOM. Are you sure your workers OOM?
>>
>>
>> Yong
>>
>>
>> --
>> *From:* bsikander 
>> *Sent:* Friday, March 24, 2017 5:48 AM
>> *To:* user@spark.apache.org
>> *Subject:* [Worker Crashing] OutOfMemoryError: GC overhead limit
>> execeeded
>>
>> Spark version: 1.6.2
>> Hadoop: 2.6.0
>>
>> Cluster:
>> All VMS are deployed on AWS.
>> 1 Master (t2.large)
>> 1 Secondary Master (t2.large)
>> 5 Workers (m4.xlarge)
>> Zookeeper (t2.large)
>>
>> Recently, 2 of our workers went down with out of memory exception.
>> java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)
>>
>> Both of these worker processes were in hanged state. We restarted them to
>> bring them back to normal state.
>>
>> Here is the complete exception
>> https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
>> <https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91>
>> Worker crashing
>> <https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91>
>> gist.github.com
>> Worker crashing
>>
>>
>>
>> Master's spark-default.conf file:
>> https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
>> <https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d>
>> Default Configuration file for MASTER
>> <https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d>
>> gist.github.com
>> Default Configuration file for MASTER
>>
>>
>>
>> Master's spark-env.sh
>> https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0
>>
>> Slave's spark-default.conf file:
>> https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c
>>
>> So, what could be the reason of our workers crashing due to OutOfMemory ?
>> How can we avoid that in future.
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-ov
>> erhead-limit-execeeded-tp28535.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-24 Thread Behroz Sikander
Thank you for the response.

Yes, I am sure because the driver was working fine. Only 2 workers went
down with OOM.

Regards,
Behroz

On Fri, Mar 24, 2017 at 2:12 PM, Yong Zhang  wrote:

> I am not 100% sure, but normally "dispatcher-event-loop" OOM means the
> driver OOM. Are you sure your workers OOM?
>
>
> Yong
>
>
> --
> *From:* bsikander 
> *Sent:* Friday, March 24, 2017 5:48 AM
> *To:* user@spark.apache.org
> *Subject:* [Worker Crashing] OutOfMemoryError: GC overhead limit execeeded
>
> Spark version: 1.6.2
> Hadoop: 2.6.0
>
> Cluster:
> All VMS are deployed on AWS.
> 1 Master (t2.large)
> 1 Secondary Master (t2.large)
> 5 Workers (m4.xlarge)
> Zookeeper (t2.large)
>
> Recently, 2 of our workers went down with out of memory exception.
> java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)
>
> Both of these worker processes were in hanged state. We restarted them to
> bring them back to normal state.
>
> Here is the complete exception
> https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91
> 
> Worker crashing
> 
> gist.github.com
> Worker crashing
>
>
>
> Master's spark-default.conf file:
> https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d
> 
> Default Configuration file for MASTER
> 
> gist.github.com
> Default Configuration file for MASTER
>
>
>
> Master's spark-env.sh
> https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0
>
> Slave's spark-default.conf file:
> https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c
>
> So, what could be the reason of our workers crashing due to OutOfMemory ?
> How can we avoid that in future.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Worker-Crashing-OutOfMemoryError-GC-
> overhead-limit-execeeded-tp28535.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Worker Crashing] OutOfMemoryError: GC overhead limit execeeded

2017-03-23 Thread Behroz Sikander
Hello,
Spark version: 1.6.2
Hadoop: 2.6.0

Cluster:
All VMS are deployed on AWS.
1 Master (t2.large)
1 Secondary Master (t2.large)
5 Workers (m4.xlarge)
Zookeeper (t2.large)

Recently, 2 of our workers went down with out of memory exception.

> java.lang.OutOfMemoryError: GC overhead limit exceeded (max heap: 1024 MB)


Both of these worker processes were in hanged state. We restarted them to
bring them back to normal state.

Here is the complete exception
https://gist.github.com/bsikander/84f1a0f3cc831c7a120225a71e435d91

Master's spark-default.conf file:
https://gist.github.com/bsikander/4027136f6a6c91eabad576495c4d797d

Master's spark-env.sh
https://gist.github.com/bsikander/42f76d7a8e4079098d8a2df3cdee8ee0

Slave's spark-default.conf file:
https://gist.github.com/bsikander/54264349b49e6227c6912eb14d344b8c

So, what could be the reason of our workers crashing due to OutOfMemory ?
How can we avoid that in future.

Regards,
Behroz