masseyke opened a new pull request #34838:
URL: https://github.com/apache/spark/pull/34838
This commit adds support for RDDs containing ShortWritables to pyspark.
Right now if a user calls sc.newAPIHadoopRDD() with an InputFormat that
provides ShortWritables, the call will fail with an error like the one below
because ShortWritable is not explicitly handled by PythonHadoopUtil.
```
>>> rdd =
sc.newAPIHadoopRDD(inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
... keyClass="org.apache.hadoop.io.NullWritable",
...
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
... conf=conf)
2021-12-08 14:38:40,439 ERROR scheduler.TaskSetManager: task 0.0 in stage
15.0 (TID 31) had a not serializable result: org.apache.hadoop.io.ShortWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.io.ShortWritable,
value: 1)
- writeObject data (class: java.util.HashMap)
- object (class java.util.HashMap, {price=1})
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (1,{price=1}))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 1); not retrying
Traceback (most recent call last):
File "<stdin>", line 4, in <module>
File "/home/hduser/spark-3.1.2-bin-hadoop3.2/python/pyspark/context.py",
line 853, in newAPIHadoopRDD
jconf, batchSize)
File
"/home/hduser/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
File "/home/hduser/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py",
line 111, in deco
return f(*a, **kw)
File
"/home/hduser/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.apache.spark.SparkException: Job aborted due to stage failure: task
0.0 in stage 15.0 (TID 31) had a not serializable result:
org.apache.hadoop.io.ShortWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.io.ShortWritable,
value: 1)
- writeObject data (class: java.util.HashMap)
- object (class java.util.HashMap, {price=1})
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (1,{price=1}))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 1)
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
at
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:173)
at
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:385)
at
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]