I don't see any reason to think this is related to YARN. You haven't shown the actual error @rajat so not sure there is anything to say.
On Fri, May 7, 2021 at 3:08 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > I have suspicion that this may be caused by your cluster as it appears > that you are running this in YARN mode like below > > spark-submit --master yarn --deploy-mode client xyx.py > > What happens if you try running it in local mode? > > spark-submit --master local[2] xyx.py > > Is this run in a managed cluster like GCP dataproc? > > HTH > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 7 May 2021 at 19:17, rajat kumar <kumar.rajat20...@gmail.com> > wrote: > >> Thanks Mich and Sean for the response . Yes Sean is right. This is a >> batch job. >> >> I am having only 10 records in the dataframe still it is giving this >> exception >> >> Following are the full logs. >> >> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line >> 584, in foreach >> self.rdd.foreach(f) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in >> foreach >> self.mapPartitions(processPartition).count() # Force evaluation >> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in >> count >> return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() >> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in >> sum >> return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in >> fold >> vals = self.mapPartitions(func).collect() >> File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in >> collect >> sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) >> File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", >> line 1257, in __call__ >> answer, self.gateway_client, self.target_id, self.name) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, >> in deco >> return f(*a, **kw) >> File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line >> 328, in get_return_value >> format(target_id, ".", name), value) >> py4j.protocol.Py4JJavaError: An error occurred while calling >> z:org.apache.spark.api.python.PythonRDD.collectAndServe. >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage >> 3.0 (TID 10, 10.244.158.5, executor 1): >> org.apache.spark.api.python.PythonException: Traceback (most recent call >> last): >> File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, >> in main >> func, profiler, deserializer, serializer = read_command(pickleSer, >> infile) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in >> read_command >> command = serializer._read_with_length(file) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >> 172, in _read_with_length >> return self.loads(obj) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line >> 580, in loads >> return pickle.loads(obj, encoding=encoding) >> File >> "/opt/dataflow/python/lib/python3.6/site-packages/module/read_data.py", >> line 10, in <module> >> spark = SparkSession.builder.appName("test").getOrCreate() >> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line >> 173, in getOrCreate >> sc = SparkContext.getOrCreate(sparkConf) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, >> in getOrCreate >> SparkContext(conf=conf or SparkConf()) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133, >> in __init__ >> SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316, >> in _ensure_initialized >> SparkContext._gateway = gateway or launch_gateway(conf) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line >> 46, in launch_gateway >> return _launch_gateway(conf) >> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line >> 108, in _launch_gateway >> raise Exception("Java gateway process exited before sending its port >> number") >> Exception: Java gateway process exited before sending its port number >> >> On Fri, May 7, 2021 at 9:35 PM Sean Owen <sro...@gmail.com> wrote: >> >>> foreach definitely works :) >>> This is not a streaming question. >>> The error says that the JVM worker died for some reason. You'd have to >>> look at its logs to see why. >>> >>> On Fri, May 7, 2021 at 11:03 AM Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am not convinced foreach works even in 3.1.1 >>>> Try doing the same with foreachBatch >>>> >>>> foreachBatch(sendToSink). \ >>>> trigger(processingTime='2 seconds'). \ >>>> >>>> and see it works >>>> >>>> HTH >>>> >>>> >>>> >>>> view my Linkedin profile >>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>> >>>> >>>> >>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>>> any loss, damage or destruction of data or any other property which may >>>> arise from relying on this email's technical content is explicitly >>>> disclaimed. The author will in no case be liable for any monetary damages >>>> arising from such loss, damage or destruction. >>>> >>>> >>>> >>>> >>>> On Fri, 7 May 2021 at 16:07, rajat kumar <kumar.rajat20...@gmail.com> >>>> wrote: >>>> >>>>> Hi Team, >>>>> >>>>> I am using Spark 2.4.4 with Python >>>>> >>>>> While using below line: >>>>> >>>>> dataframe.foreach(lambda record : process_logs(record)) >>>>> >>>>> >>>>> My use case is , process logs will download the file from cloud >>>>> storage using Python code and then it will save the processed data. >>>>> >>>>> I am getting the following error >>>>> >>>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", >>>>> line 46, in launch_gateway >>>>> return _launch_gateway(conf) >>>>> File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", >>>>> line 108, in _launch_gateway >>>>> raise Exception("Java gateway process exited before sending its >>>>> port number") >>>>> Exception: Java gateway process exited before sending its port number >>>>> >>>>> Can anyone pls suggest what can be done? >>>>> >>>>> Thanks >>>>> Rajat >>>>> >>>>