Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7185#discussion_r34003101
  
    --- Diff: python/pyspark/streaming/dstream.py ---
    @@ -602,13 +602,17 @@ class TransformedDStream(DStream):
         Multiple continuous transformations of DStream can be combined into
         one transformation.
         """
    -    def __init__(self, prev, func):
    +    def __init__(self, prev, func, transformFunc=None):
    --- End diff --
    
    Hi @tdas , my previous way like this (subclass way):
    
    ```python
    class TransformedDStream(DStream):
    
    .....
    
        def transfromFunc(self, ctx, func, *deserializers):
            return TransformFunction(ctx, func, deserializers)
    ```
    
    ```python
    
    class KafkaTransformedDStream(TransformedDStream):
    
        def __init__(self, prev, func):
            TransformedDStream.__init__(self, prev, func)
    
        def transfromFunc(self, ctx, func, *deserializers):
            return TransformFunction(ctx, func, deserializers) \
                .rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser))
    ```
    
    this will lead to such exception:
    
    ```
    Traceback (most recent call last):
      File "/mnt/data/project/apache-spark/python/pyspark/streaming/util.py", 
line 62, in call
        r = self.func(t, *rdds)
      File "/mnt/data/project/apache-spark/python/pyspark/streaming/tests.py", 
line 99, in get_output
        r = rdd.collect()
      File "/mnt/data/project/apache-spark/python/pyspark/rdd.py", line 758, in 
collect
        port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
      File 
"/mnt/data/project/apache-spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
        self.target_id, self.name)
      File 
"/mnt/data/project/apache-spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
 line 300, in get_return_value
        format(target_id, '.', name), value)
    Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 44.0 failed 1 times, most recent failure: Lost task 0.0 in stage 44.0 
(TID 44, localhost): org.apache.spark.api.python.PythonException: Traceback 
(most recent call last):
      File 
"/mnt/data/project/apache-spark/python/lib/pyspark.zip/pyspark/worker.py", line 
111, in main
        process()
      File 
"/mnt/data/project/apache-spark/python/lib/pyspark.zip/pyspark/worker.py", line 
105, in process
        iterator = deserializer.load_stream(infile)
    AttributeError: 'tuple' object has no attribute 'load_stream'
    
        at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
        at 
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    ```
    
    I guess this is due to serialization issue, so I workaround this as what I 
did currently.
    
    I will do some other tests to see if there's any other way.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to