[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange
[ https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-26549: Description: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) generated data by xrange, which don't need to use the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse takes no effect. Relative code block and more details listing below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code:java} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code:java} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:java} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} was: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) generated data by xrange, which don't need the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse take no effect. Relative code block list below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code:java} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code:java} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:java} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} > PySpark worker reuse take no effect for parallelize xrange > -- > > Key: SPARK-26549 > URL: https://issues.apache.org/jira/browse/SPARK-26549 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > During [the follow-up > work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for > PySpark worker reuse scenario, we found that the worker reuse takes no effect > for `sc.parallelize(xrange(...))`. > It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) > generated data by xrange, which don't need to use the passed-in iterator. But > this will break the end of stream checking
[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange
[ https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-26549: Description: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) generated data by xrange, which don't need the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse take no effect. Relative code block list below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code:java} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code:java} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:java} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} was: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because the specialize rdd.parallelize for xrange generated data by xrange, which don't need the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse take no effect. Relative code block list below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code:java} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code:java} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:java} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} > PySpark worker reuse take no effect for parallelize xrange > -- > > Key: SPARK-26549 > URL: https://issues.apache.org/jira/browse/SPARK-26549 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > During [the follow-up > work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for > PySpark worker reuse scenario, we found that the worker reuse takes no effect > for `sc.parallelize(xrange(...))`. > It happened because of the specialize rdd.parallelize for xrange(SPARK-4398) > generated data by xrange, which don't need the passed-in iterator. But this > will break the end of stream checking in python worker and finally cause > worker reuse
[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange
[ https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-26549: Description: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because the specialize rdd.parallelize for xrange generated data by xrange, which don't need the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse take no effect. Relative code block list below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code:java} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code:java} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:java} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} was: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because the specialize logic generated data by xrange, which don't need the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse take no effect. Relative code block list below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} > PySpark worker reuse take no effect for parallelize xrange > -- > > Key: SPARK-26549 > URL: https://issues.apache.org/jira/browse/SPARK-26549 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > During [the follow-up > work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for > PySpark worker reuse scenario, we found that the worker reuse takes no effect > for `sc.parallelize(xrange(...))`. > It happened because the specialize rdd.parallelize for xrange generated data > by xrange, which don't need the passed-in iterator. But this will break the > end of stream checking in python worker and finally cause worker reuse take > no effect. > Relative code block list below: > Current
[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange
[ https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-26549: Description: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because the specialize logic generated data by xrange, which don't need the passed-in iterator. But this will break the end of stream checking in python worker and finally cause worker reuse take no effect. Relative code block list below: Current specialize logic of xrange don't need the passed-in iterator, context.py: {code} if isinstance(c, xrange): ... def f(split, iterator): return xrange(getStart(split), getStart(split + 1), step) ... return self.parallelize([], numSlices).mapPartitionsWithIndex(f) {code} We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check end of stream. See the code in worker.py: {code} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for parallelize(range) because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} was: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because, we specialize xrange for performance in rdd.parallelize, but the specialize function don't need iterator during the python worker check end of the stream in Python3, we got an unexpected value -1 here which refers to END_OF_DATA_SECTION. See the code in worker.py: {code:python} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:python} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} > PySpark worker reuse take no effect for parallelize xrange > -- > > Key: SPARK-26549 > URL: https://issues.apache.org/jira/browse/SPARK-26549 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > During [the follow-up > work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for > PySpark worker reuse scenario, we found that the worker reuse takes no effect > for `sc.parallelize(xrange(...))`. > It happened because the specialize logic generated data by xrange, which > don't need the passed-in iterator. But this will break the end of stream > checking in python worker and finally cause worker reuse take no effect. > Relative code block list below: > Current specialize logic of xrange don't need the passed-in iterator, > context.py: > {code} > if isinstance(c, xrange): > ... > def f(split, iterator): > return xrange(getStart(split), getStart(split + 1), step) > ... > return self.parallelize([], numSlices).mapPartitionsWithIndex(f) > {code} > We got an unexpected value -1 which refers to END_OF_DATA_SECTION while check > end of stream. See the code in
[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange
[ https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-26549: Description: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for `sc.parallelize(xrange(...))`. It happened because, we specialize xrange for performance in rdd.parallelize, but the specialize function don't need iterator during the python worker check end of the stream in Python3, we got an unexpected value -1 here which refers to END_OF_DATA_SECTION. See the code in worker.py: {code:python} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:python} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} was: During [the follow-up work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for PySpark worker reuse scenario, we found that the worker reuse takes no effect for Python3 while works properly for Python2 and PyPy. It happened because, during the python worker check end of the stream in Python3, we got an unexpected value -1 here which refers to END_OF_DATA_SECTION. See the code in worker.py: {code:python} # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: write_int(SpecialLengths.END_OF_STREAM, outfile) else: # write a different value to tell JVM to not reuse this worker write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) sys.exit(-1) {code} The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has been handled during load iterator from the socket stream, see the code in FramedSerializer: {code:python} def load_stream(self, stream): while True: try: yield self._read_with_length(stream) except EOFError: return ... def _read_with_length(self, stream): length = read_int(stream) if length == SpecialLengths.END_OF_DATA_SECTION: raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in load_stream elif length == SpecialLengths.NULL: return None obj = stream.read(length) if len(obj) < length: raise EOFError return self.loads(obj) {code} > PySpark worker reuse take no effect for parallelize xrange > -- > > Key: SPARK-26549 > URL: https://issues.apache.org/jira/browse/SPARK-26549 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > During [the follow-up > work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for > PySpark worker reuse scenario, we found that the worker reuse takes no effect > for `sc.parallelize(xrange(...))`. > It happened because, we specialize xrange for performance in rdd.parallelize, > but the specialize function don't need iterator > during the python worker check end of the stream in Python3, we got an > unexpected value -1 here which refers to END_OF_DATA_SECTION. See the code in > worker.py: > {code:python} > # check end of stream > if read_int(infile) == SpecialLengths.END_OF_STREAM: > write_int(SpecialLengths.END_OF_STREAM, outfile) > else: > # write a different value to tell JVM to not reuse this worker > write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) > sys.exit(-1) > {code} > The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has > been handled during load iterator from the socket stream, see the code in > FramedSerializer: > {code:python} > def load_stream(self, stream): > while True: > try: > yield self._read_with_length(stream) > except EOFError: > return > ... > def _read_with_length(self, stream): > length = read_int(stream) > if length == SpecialLengths.END_OF_DATA_SECTION: > raise EOFError #END_OF_DATA_SECTION raised EOF
[jira] [Updated] (SPARK-26549) PySpark worker reuse take no effect for parallelize xrange
[ https://issues.apache.org/jira/browse/SPARK-26549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-26549: Summary: PySpark worker reuse take no effect for parallelize xrange (was: PySpark worker reuse take no effect for Python3) > PySpark worker reuse take no effect for parallelize xrange > -- > > Key: SPARK-26549 > URL: https://issues.apache.org/jira/browse/SPARK-26549 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > > During [the follow-up > work|https://github.com/apache/spark/pull/23435#issuecomment-451079886] for > PySpark worker reuse scenario, we found that the worker reuse takes no effect > for Python3 while works properly for Python2 and PyPy. > It happened because, during the python worker check end of the stream in > Python3, we got an unexpected value -1 here which refers to > END_OF_DATA_SECTION. See the code in worker.py: > {code:python} > # check end of stream > if read_int(infile) == SpecialLengths.END_OF_STREAM: > write_int(SpecialLengths.END_OF_STREAM, outfile) > else: > # write a different value to tell JVM to not reuse this worker > write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) > sys.exit(-1) > {code} > The code works well for Python2 and PyPy because the END_OF_DATA_SECTION has > been handled during load iterator from the socket stream, see the code in > FramedSerializer: > {code:python} > def load_stream(self, stream): > while True: > try: > yield self._read_with_length(stream) > except EOFError: > return > ... > def _read_with_length(self, stream): > length = read_int(stream) > if length == SpecialLengths.END_OF_DATA_SECTION: > raise EOFError #END_OF_DATA_SECTION raised EOF here and catched in > load_stream > elif length == SpecialLengths.NULL: > return None > obj = stream.read(length) > if len(obj) < length: > raise EOFError > return self.loads(obj) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org