Shutdown Spark application with failed state

2021-07-26 Thread Anil Dasari
Hi Team,

I am using Spark 2.x streaming with kafka.
I noticed that spark streaming is processing subsequent micro-batches in case 
of failure as it takes a while to notify the driver about the error and 
interrupt streaming-executor thread. This is creating problem as we are 
checkpointing the offsets internally.

To avoid the problem, we wanted to catch the exception in in RDD process and 
stop the spark streaming immediately.

streamRDD.foreachRDD { (rdd, microBatchTime) => {
try {
// business logi
}catch (Exception ex) {
  case ex: Exception =>
   // stop spark streaming
   streamingContext.stop(stopSparkContext = true, 
stopGracefully = false)
}
  }
}

But the spark application state is set to Completed. So, application is not 
restarted automatically by spark (with max attempts config).

I checked if there is a way to notify the error during the shutdown which sets 
the spark application status to Failed. ContextWaiter#notiftError is steaming 
package scoped and couldn’t find any other interfaces to propagate the 
error/exception to stop process.

How to tell spark streaming to stop processing subsequent micro batches if a 
micro-batch throws an exception ? Is it possible to configure spark to create 
one micro batch RDD at a time ?
How to stop the spark streaming context with error ?

Any help would be appreciated. Thanks in advance.

Regards.


Shutdown Spark application with failed state

2021-07-26 Thread Anil Dasari
Hello all,

I am using Spark 2.x streaming with kafka.
I noticed that spark streaming is processing subsequent micro-batches in case 
of failure as it takes a while to notify the driver about the error and 
interrupt streaming-executor thread. This is creating a problem as we are 
checkpointing the offsets internally.

To avoid the problem, we wanted to catch the exception in the RDD process and 
stop the spark streaming immediately.

streamRDD.foreachRDD { (rdd, microBatchTime) => {
try {
// business logic
}catch (Exception ex) {
  case ex: Exception =>
   // stop spark streaming
   streamingContext.stop(stopSparkContext = true, 
stopGracefully = false)
}
}
}

But the spark application state is set to Completed. So, the application is not 
restarted automatically by spark (with max attempts config).

I checked if there is a way to notify the error during the shutdown which sets 
the spark application status to Failed. ContextWaiter#notiftError is steaming 
package scoped and couldn’t find any other interfaces to propagate the 
error/exception to stop the process.

How to tell spark streaming to stop processing subsequent micro batches if a 
micro-batch throws an exception ? Is it possible to configure spark to create 
one micro batch RDD at a time ?
How to stop the spark streaming context with error ?

Any help would be appreciated. Thanks in advance.

Regards.



Why PySpark with spark-submit throws error trying to untar --archives pyspark_venv.tar.gz

2021-07-26 Thread Mich Talebzadeh
Hi,


Maybe someone can shed some light on this.


Running Pyspark job in minikube.


Because it is PySpark the following two conf parameters are used:


   spark-submit --verbose \

   --master k8s://$K8S_SERVER \

   --deploy-mode cluster \

   --name pytest \

   --py-files hdfs://$HDFS_HOST:$HDFS_PORT/minikube/codes/DSBQ.zip \

   --archives
hdfs://$HDFS_HOST:$HDFS_PORT/minikube/codes/${pyspark_venv}.tar.gz#${pyspark_venv}
\

The first file --py-files send the zipped PySpark project


The second one --archives is used to send the package dependencies created
with conda


These are the output from spark


Parsed arguments:
  master  k8s://192.168.49.2:8443
  deployMode  cluster
  executorMemory  5000m
  executorCores   1
  totalExecutorCores  null
  propertiesFile  /opt/spark/conf/spark-defaults.conf
  driverMemorynull
  driverCores null
  driverExtraClassPath$SPARK_HOME/jars/*.jar
  driverExtraLibraryPath  null
  driverExtraJavaOptions  null
  supervise   false
  queue   null
  numExecutors2
  files   null
  pyFiles hdfs://50.140.197.220:9000/minikube/codes/DSBQ.zip
  archiveshdfs://
50.140.197.220:9000/minikube/codes/pyspark_venv.tar.gz#pyspark_venv
  mainClass   null
  primaryResource hdfs://
50.140.197.220:9000/minikube/codes/testpackages.py
  namepytest
  childArgs   []
  jarsnull
  packagesnull
  packagesExclusions  null
  repositoriesnull
  verbose true

Trying to unpack that gz file


in the Python code I am trying to import pandas



This is what is happening from the pod logs:


Unpacking an archive hdfs://
50.140.197.220:9000/minikube/codes/pyspark_venv.tar.gz#pyspark_venv

from /tmp/spark-57c6ace6-c01f-420c-ab88-0cdb9015eb92/pyspark_venv.tar.gz

to /opt/spark/work-dir/./pyspark_venv

Exception in thread "main" ExitCodeException exitCode=2: tar:
lib/python3.7/site-packages/pandas/tests/util/__pycache__/

test_assert_categorical_equal.cpython-37.pyc:

Cannot open: Cannot allocate memory


However this works fine when I run the code in local mode as opposed to k8s!


thanks


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.