Github user jerryshao commented on a diff in the pull request:
https://github.com/apache/spark/pull/7185#discussion_r34003101
--- Diff: python/pyspark/streaming/dstream.py ---
@@ -602,13 +602,17 @@ class TransformedDStream(DStream):
Multiple continuous transformations of DStream can be combined into
one transformation.
"""
- def __init__(self, prev, func):
+ def __init__(self, prev, func, transformFunc=None):
--- End diff --
Hi @tdas , my previous way like this (subclass way):
```python
class TransformedDStream(DStream):
.....
def transfromFunc(self, ctx, func, *deserializers):
return TransformFunction(ctx, func, deserializers)
```
```python
class KafkaTransformedDStream(TransformedDStream):
def __init__(self, prev, func):
TransformedDStream.__init__(self, prev, func)
def transfromFunc(self, ctx, func, *deserializers):
return TransformFunction(ctx, func, deserializers) \
.rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser))
```
this will lead to such exception:
```
Traceback (most recent call last):
File "/mnt/data/project/apache-spark/python/pyspark/streaming/util.py",
line 62, in call
r = self.func(t, *rdds)
File "/mnt/data/project/apache-spark/python/pyspark/streaming/tests.py",
line 99, in get_output
r = rdd.collect()
File "/mnt/data/project/apache-spark/python/pyspark/rdd.py", line 758, in
collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File
"/mnt/data/project/apache-spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
self.target_id, self.name)
File
"/mnt/data/project/apache-spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 44.0 failed 1 times, most recent failure: Lost task 0.0 in stage 44.0
(TID 44, localhost): org.apache.spark.api.python.PythonException: Traceback
(most recent call last):
File
"/mnt/data/project/apache-spark/python/lib/pyspark.zip/pyspark/worker.py", line
111, in main
process()
File
"/mnt/data/project/apache-spark/python/lib/pyspark.zip/pyspark/worker.py", line
105, in process
iterator = deserializer.load_stream(infile)
AttributeError: 'tuple' object has no attribute 'load_stream'
at
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
I guess this is due to serialization issue, so I workaround this as what I
did currently.
I will do some other tests to see if there's any other way.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]