[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange

2019-01-07 Thread Yuanjian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26549:

Description: 
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) 
generated data by xrange, which don't need to use the passed-in iterator. But 
this will break the end of stream checking in python worker and finally cause 
worker reuse takes no effect.


Relative code block and more details listing below:
Current specialize logic of xrange don't need the passed-in iterator, 
context.py:
{code:java}
if isinstance(c, xrange):
...
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
...
return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
{code}
We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check 
end of stream. See the code in worker.py:
{code:java}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for parallelize(range) because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:
{code:java}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return
...
def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}

  was:
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) 
generated data by xrange, which don't need the passed-in iterator. But this 
will break the end of stream checking in python worker and finally cause worker 
reuse take no effect. 
Relative code block list below:
Current specialize logic of xrange don't need the passed-in iterator, 
context.py:
{code:java}
if isinstance(c, xrange):
...
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
...
return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
{code}
We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check 
end of stream. See the code in worker.py:
{code:java}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for parallelize(range) because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:
{code:java}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return
...
def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}


> PySpark worker reuse take no effect for parallelize xrange
> --
>
> Key: SPARK-26549
> URL: https://issues.apache.org/jira/browse/SPARK-26549
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Yuanjian Li
>Priority: Major
>
> During [the follow-up 
> work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
> PySpark worker reuse scenario, we found that the worker reuse takes no effect 
> for `sc.parallelize(xrange(...))`.
> It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) 
> generated data by xrange, which don't need to use the passed-in iterator. But 
> this will break the end of stream checking 

[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange

2019-01-07 Thread Yuanjian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26549:

Description: 
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) 
generated data by xrange, which don't need the passed-in iterator. But this 
will break the end of stream checking in python worker and finally cause worker 
reuse take no effect. 
Relative code block list below:
Current specialize logic of xrange don't need the passed-in iterator, 
context.py:
{code:java}
if isinstance(c, xrange):
...
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
...
return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
{code}
We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check 
end of stream. See the code in worker.py:
{code:java}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for parallelize(range) because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:
{code:java}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return
...
def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}

  was:
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because the specialize rdd.parallelize for xrange generated data by 
xrange, which don't need the passed-in iterator. But this will break the end of 
stream checking in python worker and finally cause worker reuse take no effect. 
Relative code block list below:
Current specialize logic of xrange don't need the passed-in iterator, 
context.py:
{code:java}
if isinstance(c, xrange):
...
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
...
return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
{code}
We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check 
end of stream. See the code in worker.py:
{code:java}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for parallelize(range) because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:
{code:java}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return
...
def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}


> PySpark worker reuse take no effect for parallelize xrange
> --
>
> Key: SPARK-26549
> URL: https://issues.apache.org/jira/browse/SPARK-26549
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Yuanjian Li
>Priority: Major
>
> During [the follow-up 
> work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
> PySpark worker reuse scenario, we found that the worker reuse takes no effect 
> for `sc.parallelize(xrange(...))`.
> It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) 
> generated data by xrange, which don't need the passed-in iterator. But this 
> will break the end of stream checking in python worker and finally cause 
> worker reuse 

[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange

2019-01-07 Thread Yuanjian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26549:

Description: 
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because the specialize rdd.parallelize for xrange generated data by 
xrange, which don't need the passed-in iterator. But this will break the end of 
stream checking in python worker and finally cause worker reuse take no effect. 
Relative code block list below:
Current specialize logic of xrange don't need the passed-in iterator, 
context.py:
{code:java}
if isinstance(c, xrange):
...
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
...
return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
{code}
We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check 
end of stream. See the code in worker.py:
{code:java}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for parallelize(range) because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:
{code:java}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return
...
def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}

  was:
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because the specialize logic generated data by xrange, which don't 
need the passed-in iterator. But this will break the end of stream checking in 
python worker and finally cause worker reuse take no effect. 
Relative code block list below:
Current specialize logic of xrange don't need the passed-in iterator, 
context.py:
{code}
if isinstance(c, xrange):
...
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
...
return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
{code}
We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check 
end of stream. See the code in worker.py:
{code}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for parallelize(range) because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:
{code}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return
...
def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}


> PySpark worker reuse take no effect for parallelize xrange
> --
>
> Key: SPARK-26549
> URL: https://issues.apache.org/jira/browse/SPARK-26549
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Yuanjian Li
>Priority: Major
>
> During [the follow-up 
> work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
> PySpark worker reuse scenario, we found that the worker reuse takes no effect 
> for `sc.parallelize(xrange(...))`.
> It happened because the specialize rdd.parallelize for xrange generated data 
> by xrange, which don't need the passed-in iterator. But this will break the 
> end of stream checking in python worker and finally cause worker reuse take 
> no effect. 
> Relative code block list below:
> Current 

[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange

2019-01-07 Thread Yuanjian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26549:

Description: 
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because the specialize logic generated data by xrange, which don't 
need the passed-in iterator. But this will break the end of stream checking in 
python worker and finally cause worker reuse take no effect. 
Relative code block list below:
Current specialize logic of xrange don't need the passed-in iterator, 
context.py:
{code}
if isinstance(c, xrange):
...
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
...
return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
{code}
We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check 
end of stream. See the code in worker.py:
{code}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for parallelize(range) because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:
{code}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return
...
def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}

  was:
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because, we specialize xrange for performance in rdd.parallelize, 
but the specialize function don't need iterator



 during the python worker check end of the stream in Python3, we got an 
unexpected value -1 here which refers to END_OF_DATA_SECTION. See the code in 
worker.py:
{code:python}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:

{code:python}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return

...

def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}




> PySpark worker reuse take no effect for parallelize xrange
> --
>
> Key: SPARK-26549
> URL: https://issues.apache.org/jira/browse/SPARK-26549
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Yuanjian Li
>Priority: Major
>
> During [the follow-up 
> work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
> PySpark worker reuse scenario, we found that the worker reuse takes no effect 
> for `sc.parallelize(xrange(...))`.
> It happened because the specialize logic generated data by xrange, which 
> don't need the passed-in iterator. But this will break the end of stream 
> checking in python worker and finally cause worker reuse take no effect. 
> Relative code block list below:
> Current specialize logic of xrange don't need the passed-in iterator, 
> context.py:
> {code}
> if isinstance(c, xrange):
> ...
> def f(split, iterator):
> return xrange(getStart(split), getStart(split + 1), step)
> ...
> return self.parallelize([], numSlices).mapPartitionsWithIndex(f)
> {code}
> We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check 
> end of stream. See the code in 

[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange

2019-01-07 Thread Yuanjian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26549:

Description: 
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for `sc.parallelize(xrange(...))`.
It happened because, we specialize xrange for performance in rdd.parallelize, 
but the specialize function don't need iterator



 during the python worker check end of the stream in Python3, we got an 
unexpected value -1 here which refers to END_OF_DATA_SECTION. See the code in 
worker.py:
{code:python}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:

{code:python}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return

...

def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}



  was:
During [the follow-up 
work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
PySpark worker reuse scenario, we found that the worker reuse takes no effect 
for Python3 while works properly for Python2 and PyPy.
It happened because, during the python worker check end of the stream in 
Python3, we got an unexpected value -1 here which refers to 
END_OF_DATA_SECTION. See the code in worker.py:
{code:python}
# check end of stream
if read_int(infile) == SpecialLengths.END_OF_STREAM:
write_int(SpecialLengths.END_OF_STREAM, outfile)
else:
# write a different value to tell JVM to not reuse this worker
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
sys.exit(-1)
{code}
The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has 
been handled during load iterator from the socket stream, see the code in 
FramedSerializer:

{code:python}
def load_stream(self, stream):
while True:
try:
yield self._read_with_length(stream)
except EOFError:
return

...

def _read_with_length(self, stream):
length = read_int(stream)
if length == SpecialLengths.END_OF_DATA_SECTION:
raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
load_stream
elif length == SpecialLengths.NULL:
return None
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return self.loads(obj)
{code}




> PySpark worker reuse take no effect for parallelize xrange
> --
>
> Key: SPARK-26549
> URL: https://issues.apache.org/jira/browse/SPARK-26549
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Yuanjian Li
>Priority: Major
>
> During [the follow-up 
> work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
> PySpark worker reuse scenario, we found that the worker reuse takes no effect 
> for `sc.parallelize(xrange(...))`.
> It happened because, we specialize xrange for performance in rdd.parallelize, 
> but the specialize function don't need iterator
>  during the python worker check end of the stream in Python3, we got an 
> unexpected value -1 here which refers to END_OF_DATA_SECTION. See the code in 
> worker.py:
> {code:python}
> # check end of stream
> if read_int(infile) == SpecialLengths.END_OF_STREAM:
> write_int(SpecialLengths.END_OF_STREAM, outfile)
> else:
> # write a different value to tell JVM to not reuse this worker
> write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
> sys.exit(-1)
> {code}
> The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has 
> been handled during load iterator from the socket stream, see the code in 
> FramedSerializer:
> {code:python}
> def load_stream(self, stream):
> while True:
> try:
> yield self._read_with_length(stream)
> except EOFError:
> return
> ...
> def _read_with_length(self, stream):
> length = read_int(stream)
> if length == SpecialLengths.END_OF_DATA_SECTION:
> raise EOFError #END_OF_DATA_SECTION raised EOF 

[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange

2019-01-07 Thread Yuanjian Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuanjian Li updated SPARK-26549:

Summary: PySpark worker reuse take no effect for parallelize xrange  (was: 
PySpark worker reuse take no effect for Python3)

> PySpark worker reuse take no effect for parallelize xrange
> --
>
> Key: SPARK-26549
> URL: https://issues.apache.org/jira/browse/SPARK-26549
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Yuanjian Li
>Priority: Major
>
> During [the follow-up 
> work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for 
> PySpark worker reuse scenario, we found that the worker reuse takes no effect 
> for Python3 while works properly for Python2 and PyPy.
> It happened because, during the python worker check end of the stream in 
> Python3, we got an unexpected value -1 here which refers to 
> END_OF_DATA_SECTION. See the code in worker.py:
> {code:python}
> # check end of stream
> if read_int(infile) == SpecialLengths.END_OF_STREAM:
> write_int(SpecialLengths.END_OF_STREAM, outfile)
> else:
> # write a different value to tell JVM to not reuse this worker
> write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
> sys.exit(-1)
> {code}
> The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has 
> been handled during load iterator from the socket stream, see the code in 
> FramedSerializer:
> {code:python}
> def load_stream(self, stream):
> while True:
> try:
> yield self._read_with_length(stream)
> except EOFError:
> return
> ...
> def _read_with_length(self, stream):
> length = read_int(stream)
> if length == SpecialLengths.END_OF_DATA_SECTION:
> raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in 
> load_stream
> elif length == SpecialLengths.NULL:
> return None
> obj = stream.read(length)
> if len(obj) < length:
> raise EOFError
> return self.loads(obj)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org