Github user BryanCutler commented on the issue:
https://github.com/apache/spark/pull/20838
Hi @cclauss , sorry for the frustration. I looked into the test, and it
was kind of a pain to get it working right - which is probably why it wasn't
done in the first place ;)
Here are my modifications for `test_slice` and it seems to pass py3 fine
```python
def test_slice(self):
"""Basic operation test for DStream.slice."""
import datetime as dt
self.ssc = StreamingContext(self.sc, 1.0)
self.ssc.remember(4.0)
input = [[1], [2], [3], [4]]
stream = self.ssc.queueStream([self.sc.parallelize(d, 1) for d in
input])
time_vals = []
def get_times(t, rdd):
if rdd and len(time_vals) < len(input):
time_vals.append(t)
stream.foreachRDD(get_times)
self.ssc.start()
self.wait_for(time_vals, 4)
begin_time = time_vals[0]
def get_sliced(begin_delta, end_delta):
begin = begin_time + dt.timedelta(seconds=begin_delta)
end = begin_time + dt.timedelta(seconds=end_delta)
rdds = stream.slice(begin, end)
result_list = [rdd.collect() for rdd in rdds]
return [r for result in result_list for r in result]
self.assertEqual(set([1]), set(get_sliced(0, 0)))
self.assertEqual(set([2, 3]), set(get_sliced(1, 2)))
self.assertEqual(set([2, 3, 4]), set(get_sliced(1, 4)))
self.assertEqual(set([1, 2, 3, 4]), set(get_sliced(0, 4)))
```
If you want to put that in, I have some time now and can help you get this
merged or if you prefer I can finish it up and still assign to you.
```p
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]