[jira] [Updated] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2018-06-25 Thread Joshuawangzj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-24657:
-
Description: 
In my sql, It join three tables, and all these tables are small table (about 
2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
oom exception: 
{code:java}
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of 
memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
at 
org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
{code}
{code:java}
12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
stage 3.0 (TID 34, localhost, executor driver): 
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of 
memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
at 
org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
{code}

Finally I found out the problem go through studying the source code. The reason 
of  the exception is that task can't allocate page(in my case, the size per 
page is 32M) from MemoryManager because coalesce will run 20 parent paritition 
in one task(spark.sql.shuffle.partitions=20), and after sorted merge join for 
each parent partition, the UnsafeExternalRowSorter can not cleanup some pages 
allocated. After run 14th parent partition(in my case), there is no enough 
space in execution memory for acquiring page in sort. 

Why UnsafeExternalRowSorter can not cleanup some pages resource after finished 
join for parent partition?
After my constant attempts, the problem is in SortMergeJoinScanner. 
UnsafeExternalRowSorter cleanup resource only when it's iterator be advance to 
end. But in SortMergeJoinScanner, when streamedIterator is end ,the 
bufferedIterator may not end, so bufferedIterator cannot cleanup the resource 
and vice versa.

The solution may be :
1、advance to last for the iterator when another iterator has traversed to last. 
This solution may decrease performace because of the unnecessary traversing.
2、When one iterator has traversed to last, we invoke the sorter cleanup method 
directly. This solution will cause large change for source code.

  was:
In my sql, It join three tables, and all these tables are small table (about 
2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
oom exception: 
{code:java}
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of 
memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
at 

[jira] [Updated] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleanup resource when finished the merge join

2018-06-25 Thread Joshuawangzj (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-24657:
-
Description: 
In my sql, It join three tables, and all these tables are small table (about 
2mb). And to solve the small files issue, I use coalesce(1). But it throw the 
oom exception: 
{code:java}
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of 
memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
at 
org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
{code}
{code:java}
12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
stage 3.0 (TID 34, localhost, executor driver): 
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of 
memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
at 
org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
{code}

Finally I found out the problem go through studying the source code. The reason 
of  the exception is that task can't allocate page(in my case, the size per 
page is 32M) from MemoryManager because coalesce will run 20 parent paritition 
in one task(spark.sql.shuffle.partitions=20), and after sorted merge join for 
each parent partition, the UnsafeExternalRowSorter can not cleanup some pages 
allocated. After run 14th parent partition(in my case), there is no enough 
space in execution memory for acquiring page in sort. 

Why UnsafeExternalRowSorter can not cleanup some pages resource after finished 
join for parent partition?

  was:
{code:java}
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of 
memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
at 
org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
at 

[jira] [Created] (SPARK-24657) SortMergeJoin may cause SparkOutOfMemory in execution memory because of not cleaning resource when finished the merge join

2018-06-25 Thread Joshuawangzj (JIRA)
Joshuawangzj created SPARK-24657:


 Summary: SortMergeJoin may cause SparkOutOfMemory  in execution 
memory  because of not cleaning resource when finished the merge join
 Key: SPARK-24657
 URL: https://issues.apache.org/jira/browse/SPARK-24657
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.1, 2.3.0, 2.2.0
Reporter: Joshuawangzj


{code:java}
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of 
memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
at 
org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:611)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10.apply(WholeStageCodegenExec.scala:608)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
{code}
{code:java}
12:10:51.175 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in 
stage 3.0 (TID 34, localhost, executor driver): 
org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of 
memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:159)
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:99)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:162)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:129)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:111)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.create(UnsafeExternalRowSorter.java:96)
at 
org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:89)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.init(generated.java:22)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21045) Spark executor blocked instead of throwing exception because exception occur when python worker send exception info to PythonRDD in Python 2+

2017-06-14 Thread Joshuawangzj (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-21045:
-
Summary: Spark executor blocked instead of throwing exception because 
exception occur when python worker send exception info to PythonRDD in Python 
2+  (was: Spark executor blocked instead of throwing exception because 
exception occur when python worker send exception info to Java Gateway in 
Python 2+)

> Spark executor blocked instead of throwing exception because exception occur 
> when python worker send exception info to PythonRDD in Python 2+
> -
>
> Key: SPARK-21045
> URL: https://issues.apache.org/jira/browse/SPARK-21045
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.1, 2.0.2, 2.1.1
> Environment: It has problem only in Python 2+. 
> Python 3+ is ok.
>Reporter: Joshuawangzj
>
> My pyspark program is always blocking in product yarn cluster. Then I jstack 
> and found :
> {code}
> "Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
> tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
>java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> at java.net.SocketInputStream.read(SocketInputStream.java:170)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
> - locked <0x0007acab1c98> (a java.io.BufferedInputStream)
> at java.io.DataInputStream.readInt(DataInputStream.java:387)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
> at 
> org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> It is blocking in socket read.  I view the log on blocking executor and found 
> error:
> {code}
> Traceback (most recent call last):
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in 
> main
> write_with_length(traceback.format_exc().encode("utf-8"), outfile)
> UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
> ordinal not in range(128)
> {code}
> Finally I found the problem:
> {code:title=worker.py|borderStyle=solid}
> # 178 line in spark 2.1.1
> except Exception:
> try:
> write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
> write_with_length(traceback.format_exc().encode("utf-8"), outfile)
> except IOError:
> # JVM close the socket
> pass
> except Exception:
> # Write the error to stderr if it happened while serializing
> print("PySpark worker failed with exception:", file=sys.stderr)
> print(traceback.format_exc(), file=sys.stderr)
> {code}
> when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
> exception like UnicodeDecodeError, the python worker can't send the trace 
> info, but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the 
> trace info length next. So it is blocking.
> {code:title=PythonRDD.scala|borderStyle=solid}
> # 190 line in spark 2.1.1
> case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
>  // Signals that an exception has been thrown in python
>  val exLength = stream.readInt()  // It is possible to be blocked
> {code}
> {color:red}
> We can triggle the bug use simple program:
> {color}
> {code:title=test.py|borderStyle=solid}
> spark = SparkSession.builder.master('local').getOrCreate()
> rdd = spark.sparkContext.parallelize(['中']).map(lambda x: 
> x.encode("utf8"))
> rdd.collect()
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (SPARK-21045) Spark executor blocked instead of throwing exception because exception occur when python worker send exception info to Java Gateway in Python 2+

2017-06-12 Thread Joshuawangzj (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-21045:
-
Environment: 
It has problem only in Python 2+. 
Python 3+ is ok.

Summary: Spark executor blocked instead of throwing exception because 
exception occur when python worker send exception info to Java Gateway in 
Python 2+  (was: Spark executor blocked instead of throwing exception because 
exception occur when python worker send exception info to Java Gateway)

> Spark executor blocked instead of throwing exception because exception occur 
> when python worker send exception info to Java Gateway in Python 2+
> 
>
> Key: SPARK-21045
> URL: https://issues.apache.org/jira/browse/SPARK-21045
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.1, 2.0.2, 2.1.1
> Environment: It has problem only in Python 2+. 
> Python 3+ is ok.
>Reporter: Joshuawangzj
>
> My pyspark program is always blocking in product yarn cluster. Then I jstack 
> and found :
> {code}
> "Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
> tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
>java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> at java.net.SocketInputStream.read(SocketInputStream.java:170)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
> - locked <0x0007acab1c98> (a java.io.BufferedInputStream)
> at java.io.DataInputStream.readInt(DataInputStream.java:387)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
> at 
> org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> It is blocking in socket read.  I view the log on blocking executor and found 
> error:
> {code}
> Traceback (most recent call last):
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in 
> main
> write_with_length(traceback.format_exc().encode("utf-8"), outfile)
> UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
> ordinal not in range(128)
> {code}
> Finally I found the problem:
> {code:title=worker.py|borderStyle=solid}
> # 178 line in spark 2.1.1
> except Exception:
> try:
> write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
> write_with_length(traceback.format_exc().encode("utf-8"), outfile)
> except IOError:
> # JVM close the socket
> pass
> except Exception:
> # Write the error to stderr if it happened while serializing
> print("PySpark worker failed with exception:", file=sys.stderr)
> print(traceback.format_exc(), file=sys.stderr)
> {code}
> when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
> exception like UnicodeDecodeError, the python worker can't send the trace 
> info, but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the 
> trace info length next. So it is blocking.
> {code:title=PythonRDD.scala|borderStyle=solid}
> # 190 line in spark 2.1.1
> case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
>  // Signals that an exception has been thrown in python
>  val exLength = stream.readInt()  // It is possible to be blocked
> {code}
> {color:red}
> We can triggle the bug use simple program:
> {color}
> {code:title=test.py|borderStyle=solid}
> spark = SparkSession.builder.master('local').getOrCreate()
> rdd = spark.sparkContext.parallelize(['中']).map(lambda x: 
> x.encode("utf8"))
> rdd.collect()
> {code}



--
This message was sent by Atlassian JIRA

[jira] [Updated] (SPARK-21045) Spark executor blocked instead of throwing exception because exception occur when python worker send exception info to Java Gateway

2017-06-10 Thread Joshuawangzj (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-21045:
-
Summary: Spark executor blocked instead of throwing exception because 
exception occur when python worker send exception info to Java Gateway  (was: 
Spark executor is blocked instead of throwing exception because exception occur 
when python worker send exception trace stack info to Java Gateway)

> Spark executor blocked instead of throwing exception because exception occur 
> when python worker send exception info to Java Gateway
> ---
>
> Key: SPARK-21045
> URL: https://issues.apache.org/jira/browse/SPARK-21045
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.0.1, 2.0.2, 2.1.1
>Reporter: Joshuawangzj
>
> My pyspark program is always blocking in product yarn cluster. Then I jstack 
> and found :
> {code}
> "Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
> tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
>java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> at java.net.SocketInputStream.read(SocketInputStream.java:170)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
> - locked <0x0007acab1c98> (a java.io.BufferedInputStream)
> at java.io.DataInputStream.readInt(DataInputStream.java:387)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
> at 
> org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
> at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> It is blocking in socket read.  I view the log on blocking executor and found 
> error:
> {code}
> Traceback (most recent call last):
>   File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in 
> main
> write_with_length(traceback.format_exc().encode("utf-8"), outfile)
> UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
> ordinal not in range(128)
> {code}
> Finally I found the problem:
> {code:title=worker.py|borderStyle=solid}
> # 178 line in spark 2.1.1
> except Exception:
> try:
> write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
> write_with_length(traceback.format_exc().encode("utf-8"), outfile)
> except IOError:
> # JVM close the socket
> pass
> except Exception:
> # Write the error to stderr if it happened while serializing
> print("PySpark worker failed with exception:", file=sys.stderr)
> print(traceback.format_exc(), file=sys.stderr)
> {code}
> when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
> exception like UnicodeDecodeError, the python worker can't send the trace 
> info, but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the 
> trace info length next. So it is blocking.
> {code:title=PythonRDD.scala|borderStyle=solid}
> # 190 line in spark 2.1.1
> case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
>  // Signals that an exception has been thrown in python
>  val exLength = stream.readInt()  // It is possible to be blocked
> {code}
> {color:red}
> We can triggle the bug use simple program:
> {color}
> {code:title=test.py|borderStyle=solid}
> spark = SparkSession.builder.master('local').getOrCreate()
> rdd = spark.sparkContext.parallelize(['中']).map(lambda x: 
> x.encode("utf8"))
> rdd.collect()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Updated] (SPARK-21045) Spark executor is blocked instead of throwing exception because exception occur when python worker send exception trace stack info to Java Gateway

2017-06-10 Thread Joshuawangzj (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-21045:
-
Description: 
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

It is blocking in socket read.  I view the log on blocking executor and found 
error:

{code}
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in main
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
ordinal not in range(128)
{code}

Finally I found the problem:

{code:title=worker.py|borderStyle=solid}
# 178 line in spark 2.1.1
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
print("PySpark worker failed with exception:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
{code}

when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
exception like UnicodeDecodeError, the python worker can't send the trace info, 
but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the trace 
info length next. So it is blocking.

{code:title=PythonRDD.scala|borderStyle=solid}
# 190 line in spark 2.1.1
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
 // Signals that an exception has been thrown in python
 val exLength = stream.readInt()  // It is possible to be blocked
{code}

{color:red}
We can triggle the bug use simple program:
{color}
{code:title=test.py|borderStyle=solid}
spark = SparkSession.builder.master('local').getOrCreate()
rdd = spark.sparkContext.parallelize(['中']).map(lambda x: x.encode("utf8"))
rdd.collect()
{code}

  was:
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at 

[jira] [Updated] (SPARK-21045) Spark executor is blocked instead of throwing exception because exception occur when python worker send exception trace stack info to Java Gateway

2017-06-10 Thread Joshuawangzj (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-21045:
-
Description: 
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

It is blocking in socket read.  I view the log on blocking executor and found 
error:

{code}
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in main
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
ordinal not in range(128)
{code}

Finally I found the problem:

{code:title=worker.py|borderStyle=solid}
# 178 line in spark 2.1.1
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
print("PySpark worker failed with exception:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
{code}

when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
exception like UnicodeDecodeError, the python worker can't send the trace info, 
but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the trace 
info length next. So it is blocking.

{code:title=PythonRDD.scala|borderStyle=solid}
# 190 line in spark 2.1.1
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
 // Signals that an exception has been thrown in python
 val exLength = stream.readInt()  // It is possible to be blocked
{code}

We can triggle the bug use simple program:

{code title=test.py|borderStyle=solid}
spark = SparkSession.builder.master('local').getOrCreate()
rdd = spark.sparkContext.parallelize(['中']).map(lambda x: x.encode("utf8"))
rdd.collect()
{code}



  was:
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at 

[jira] [Updated] (SPARK-21045) Spark executor is blocked instead of throwing exception because exception occur when python worker send exception trace stack info to Java Gateway

2017-06-10 Thread Joshuawangzj (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-21045:
-
Description: 
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

It is blocking in socket read.  I view the log on blocking executor and found 
error:

{code}
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in main
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
ordinal not in range(128)
{code}

Finally I found the problem:

{code:title=worker.py|borderStyle=solid}
# 178 line in spark 2.1.1
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
print("PySpark worker failed with exception:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
{code}

when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
exception like UnicodeDecodeError, the python worker can't send the trace info, 
but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the trace 
info length next. So it is blocking.

{code:title=PythonRDD.scala|borderStyle=solid}
# 190 line in spark 2.1.1
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
 // Signals that an exception has been thrown in python
 val exLength = stream.readInt()  // It is possible to be blocked
{code}

{color:red}
We can triggle the bug use simple program:
{color}

{code title=test.py|borderStyle=solid}
spark = SparkSession.builder.master('local').getOrCreate()
rdd = spark.sparkContext.parallelize(['中']).map(lambda x: x.encode("utf8"))
rdd.collect()
{code}



  was:
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at 

[jira] [Updated] (SPARK-21045) Spark executor is blocked instead of throwing exception because exception occur when python worker send exception trace stack info to Java Gateway

2017-06-10 Thread Joshuawangzj (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-21045:
-
Description: 
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

It is blocking in socket read.  I view the log on blocking executor and found 
error:

{code}
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in main
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
ordinal not in range(128)
{code}

Finally I found the problem:

{code:title=worker.py|borderStyle=solid}
# 178 line in spark 2.1.1
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
print("PySpark worker failed with exception:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
{code}

when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
exception like UnicodeDecodeError, the python worker can't send the trace info, 
but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the trace 
info length next. So it is blocking.

{code:title=PythonRDD.scala|borderStyle=solid}
# 190 line in spark 2.1.1
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
 // Signals that an exception has been thrown in python
 val exLength = stream.readInt()  // It is possible to be blocked
{code}

{color:red}
We can triggle the bug use simple program:
{color}
{code title=test.py|borderStyle=solid}
spark = SparkSession.builder.master('local').getOrCreate()
rdd = spark.sparkContext.parallelize(['中']).map(lambda x: x.encode("utf8"))
rdd.collect()
{code}



  was:
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at 

[jira] [Updated] (SPARK-21045) Spark executor is blocked instead of throwing exception because exception occur when python worker send exception trace stack info to Java Gateway

2017-06-10 Thread Joshuawangzj (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshuawangzj updated SPARK-21045:
-
Description: 
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

It is blocking in socket read.  I view the log on blocking executor and found 
error:

{code}
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 178, in main
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
ordinal not in range(128)
{code}

Finally I found the problem:

{code:title=worker.py|borderStyle=solid}
# 178 line in spark 2.1.1
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
print("PySpark worker failed with exception:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
{code}

when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
exception like UnicodeDecodeError, the python worker can't send the trace info, 
but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the trace 
info length next. So it is blocking.

{code:title=PythonRDD.scala|borderStyle=solid}
# 190 line in spark 2.1.1
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
 // Signals that an exception has been thrown in python
 val exLength = stream.readInt()  // It is possible to be blocked
{code}



  was:
My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at 

[jira] [Created] (SPARK-21045) Spark executor is blocked instead of throwing exception because exception occur when python worker send exception trace stack info to Java Gateway

2017-06-10 Thread Joshuawangzj (JIRA)
Joshuawangzj created SPARK-21045:


 Summary: Spark executor is blocked instead of throwing exception 
because exception occur when python worker send exception trace stack info to 
Java Gateway
 Key: SPARK-21045
 URL: https://issues.apache.org/jira/browse/SPARK-21045
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.1.1, 2.0.2, 2.0.1
Reporter: Joshuawangzj


My pyspark program is always blocking in product yarn cluster. Then I jstack 
and found :

{code}
"Executor task launch worker for task 0" #60 daemon prio=5 os_prio=31 
tid=0x7fb2f44e3000 nid=0xa003 runnable [0x000123b4a000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
- locked <0x0007acab1c98> (a java.io.BufferedInputStream)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:190)
at 
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

It is blocking in socket read.  I view the log on blocking executor and found 
error:

{code}
Traceback (most recent call last):
  File 
"/Users/wangzejie/software/spark/python/lib/pyspark.zip/pyspark/worker.py", 
line 178, in main
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 618: 
ordinal not in range(128)
{code}

Finally I found the problem:

{code:title=worker.py|borderStyle=solid}
# 178 line in spark 2.1.1
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
write_with_length(traceback.format_exc().encode("utf-8"), outfile)
except IOError:
# JVM close the socket
pass
except Exception:
# Write the error to stderr if it happened while serializing
print("PySpark worker failed with exception:", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
{code}

when write_with_length(traceback.format_exc().encode("utf-8"), outfile) occur 
exception like UnicodeDecodeError, the python worker can't send the trace info, 
but when the PythonRDD get PYTHON_EXCEPTION_THROWN, It should read the trace 
info length next. So it is blocking.

{code:title=PythonRDD.scala|borderStyle=solid}
# 190 line in spark 2.1.1
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
 // Signals that an exception has been thrown in python
 val exLength = stream.readInt()  // It is possible to be blocked
{code}





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org