I don't see any reason to think this is related to YARN.
You haven't shown the actual error @rajat so not sure there is anything to
say.

On Fri, May 7, 2021 at 3:08 PM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> I have suspicion that this may be caused by your cluster as it appears
> that you are running this in YARN mode like below
>
> spark-submit --master yarn --deploy-mode client xyx.py
>
> What happens if you try running it in local mode?
>
> spark-submit --master local[2] xyx.py
>
> Is this run in a managed cluster like GCP dataproc?
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 7 May 2021 at 19:17, rajat kumar <kumar.rajat20...@gmail.com>
> wrote:
>
>> Thanks Mich and Sean for the response . Yes Sean is right. This is a
>> batch job.
>>
>>   I am having only 10 records in the dataframe still it is giving this
>> exception
>>
>> Following are the full logs.
>>
>> File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line
>> 584, in foreach
>>     self.rdd.foreach(f)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in
>> foreach
>>     self.mapPartitions(processPartition).count()  # Force evaluation
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in
>> count
>>     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in
>> sum
>>     return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in
>> fold
>>     vals = self.mapPartitions(func).collect()
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in
>> collect
>>     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>>   File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>> line 1257, in __call__
>>     answer, self.gateway_client, self.target_id, self.name)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63,
>> in deco
>>     return f(*a, **kw)
>>   File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
>> 328, in get_return_value
>>     format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 1 in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage
>> 3.0 (TID 10, 10.244.158.5, executor 1):
>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>> last):
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364,
>> in main
>>     func, profiler, deserializer, serializer = read_command(pickleSer,
>> infile)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
>> read_command
>>     command = serializer._read_with_length(file)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>> 172, in _read_with_length
>>     return self.loads(obj)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
>> 580, in loads
>>     return pickle.loads(obj, encoding=encoding)
>>   File
>> "/opt/dataflow/python/lib/python3.6/site-packages/module/read_data.py",
>> line 10, in <module>
>>     spark = SparkSession.builder.appName("test").getOrCreate()
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line
>> 173, in getOrCreate
>>     sc = SparkContext.getOrCreate(sparkConf)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367,
>> in getOrCreate
>>     SparkContext(conf=conf or SparkConf())
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133,
>> in __init__
>>     SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316,
>> in _ensure_initialized
>>     SparkContext._gateway = gateway or launch_gateway(conf)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
>> 46, in launch_gateway
>>     return _launch_gateway(conf)
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
>> 108, in _launch_gateway
>>     raise Exception("Java gateway process exited before sending its port
>> number")
>> Exception: Java gateway process exited before sending its port number
>>
>> On Fri, May 7, 2021 at 9:35 PM Sean Owen <sro...@gmail.com> wrote:
>>
>>> foreach definitely works :)
>>> This is not a streaming question.
>>> The error says that the JVM worker died for some reason. You'd have to
>>> look at its logs to see why.
>>>
>>> On Fri, May 7, 2021 at 11:03 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am not convinced foreach works even in 3.1.1
>>>> Try doing the same with foreachBatch
>>>>
>>>>                      foreachBatch(sendToSink). \
>>>>                     trigger(processingTime='2 seconds'). \
>>>>
>>>> and see it works
>>>>
>>>> HTH
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, 7 May 2021 at 16:07, rajat kumar <kumar.rajat20...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am using Spark 2.4.4 with Python
>>>>>
>>>>> While using below line:
>>>>>
>>>>> dataframe.foreach(lambda record : process_logs(record))
>>>>>
>>>>>
>>>>> My use case is , process logs will download the file from cloud
>>>>> storage using Python code and then it will save the processed data.
>>>>>
>>>>> I am getting the following error
>>>>>
>>>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py",
>>>>> line 46, in launch_gateway
>>>>>     return _launch_gateway(conf)
>>>>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py",
>>>>> line 108, in _launch_gateway
>>>>>     raise Exception("Java gateway process exited before sending its
>>>>> port number")
>>>>> Exception: Java gateway process exited before sending its port number
>>>>>
>>>>> Can anyone pls suggest what can be done?
>>>>>
>>>>> Thanks
>>>>> Rajat
>>>>>
>>>>

Reply via email to