GitHub user MLnick opened a pull request:
https://github.com/apache/spark/pull/455
[WIP] SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats
So I finally resurrected this PR. It seems the old one against the
incubator mirror is no longer available, so I cannot reference it.
This adds initial support for reading Hadoop ```SequenceFile```s, as well
as arbitrary Hadoop ```InputFormat```s, in PySpark.
# Overview
The basics are as follows:
1. ```PythonRDD``` object contains the relevant methods, that are in turn
invoked by ```SparkContext``` in PySpark
2. The SequenceFile or InputFormat is read on the Scala side and converted
from ```Writable``` instances to the relevant Scala classes (in the case of
primitives)
3. ```msgpack``` is used to serialize Scala objects. An attempt is made to
register custom classes with ```msgpack```. If this fails, the fallback is
```toString```
4. ```MsgpackSerializer``` on the Python side deserializes using
```msgpack-python```.
This works "out the box" for simple ```Writable```s:
* ```Text```
* ```IntWritable```, ```DoubleWritable```
* ```NullWritable```
* ```BooleanWritable```
* ```BytesWritable```
* ```MapWritable```
It also works for simple, "struct-like" classes, with fields and a no-arg
constructor.
I've tested it out with ```ESInputFormat``` and it works very nicely:
```python
conf = {"es.resource" : "index/type" }
rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable", "
org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
rdd.first()
```
I suspect for things like HBase/Cassandra it will be a bit trickier to get
it to work out the box.
# Some things still outstanding:
1. Requires ```msgpack-python``` and will fail without it. As originally
discussed with Josh, add a ```as_strings``` argument that defaults to
```False```, that can be used if ```msgpack-python``` is not available
2. I see from https://github.com/apache/spark/pull/363 that Pyrolite is
being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is
the plan behind this - is Pyrolite preferred? It seems from a cursory glance
that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be
too hard
3. Support the key and value "wrapper" that would allow a Scala/Java
function to be plugged in that would transform whatever the key/value Writable
class is into something that can be serialized (e.g. convert some custom
Writable to a case class or map that can be easily serialized)
4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This
would require SerDe in the reverse direction if it should be compatible with
Java too. Perhaps Pyrolite would make this quite simple?
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/MLnick/spark-1 pyspark-inputformats
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/455.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #455
----
commit d86325f394e536c6b2d5cb86d08e35d508fb23d7
Author: Nick Pentreath <[email protected]>
Date: 2013-12-09T06:53:11Z
Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop
InputFormat
commit 4b0a43fa2236ed997a5f9bd273151f21c1cc23f8
Author: Nick Pentreath <[email protected]>
Date: 2013-12-12T10:31:18Z
Refactoring utils into own objects. Cleaning up old commented-out code
commit c304cc8ae1a39b3c58f21153e7311cfab7dfac9e
Author: Nick Pentreath <[email protected]>
Date: 2013-12-15T06:32:37Z
Adding supporting sequncefiles for tests. Cleaning up
commit 4e7c9e359bbbb66fd658270af266b406ffedfa05
Author: Nick Pentreath <[email protected]>
Date: 2013-12-15T09:29:16Z
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Conflicts:
core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
project/SparkBuild.scala
commit 818a1e6fd6c25a689629358f540de54d32f9acbf
Author: Nick Pentreath <[email protected]>
Date: 2013-12-15T14:32:45Z
Add seqencefile and Hadoop InputFormat support to PythonRDD
commit 4294cbb9ea89fdcb61993bbf903e6ec157b08164
Author: Nick Pentreath <[email protected]>
Date: 2013-12-19T09:06:04Z
Add old Hadoop api methods. Clean up and expand comments. Clean up argument
names
commit 0f5cd844fe63b7dd9a3e0a920f9a26a25b9f5d1a
Author: Nick Pentreath <[email protected]>
Date: 2013-12-19T09:06:37Z
Remove unused pair UTF8 class. Add comments to msgpack deserializer
commit f1d73e3cad7ba7db176ed7cb48c901318e732266
Author: Nick Pentreath <[email protected]>
Date: 2013-12-19T09:07:22Z
mergeConfs returns a copy rather than mutating one of the input arguments
commit 4d7ef2ecc4de91daae4f0de9724930b7f32dc96a
Author: Nick Pentreath <[email protected]>
Date: 2013-12-19T09:08:38Z
Fix indentation
commit eb400366ee7617c7c92682d49f64079092d94031
Author: Nick Pentreath <[email protected]>
Date: 2013-12-19T11:48:40Z
Remove unused comment lines
commit 1c8efbc4a87cd7d719d8fef4e41781c67b414a6f
Author: Nick Pentreath <[email protected]>
Date: 2014-01-13T11:45:31Z
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Conflicts:
project/SparkBuild.scala
python/pyspark/context.py
commit 619c0fa2452829882628ed2c5cee06f1dae68f6f
Author: Nick Pentreath <[email protected]>
Date: 2014-01-20T08:49:02Z
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Conflicts:
project/SparkBuild.scala
commit 703ee65c5feb402f798c7f820466fef8fd78334a
Author: Nick Pentreath <[email protected]>
Date: 2014-01-20T09:37:57Z
Add back msgpack
commit 174f52011ecfa8b6d7cb37592d5b0ea2bca9e9b5
Author: Nick Pentreath <[email protected]>
Date: 2014-01-20T09:54:26Z
Add back graphx settings
commit 795a763a39db9830d4f1917147b076ec46b6d238
Author: Nick Pentreath <[email protected]>
Date: 2014-01-20T12:14:01Z
Change name to WriteInputFormatTestDataGenerator. Cleanup some var names.
Use SPARK_HOME in path for writing test sequencefile data.
commit 2beeedb6afccdf4a6d27cc7550a82ec581a0f54a
Author: Nick Pentreath <[email protected]>
Date: 2014-02-08T15:29:34Z
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Conflicts:
core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
python/pyspark/context.py
commit 97ef708a4730bc79cccd56d1106eeddb474d58fc
Author: Nick Pentreath <[email protected]>
Date: 2014-02-14T16:27:02Z
Remove old writeToStream
commit 41856a51666a0ed94dc7063cf33a459b0352e302
Author: Nick Pentreath <[email protected]>
Date: 2014-03-19T08:00:17Z
Merge branch 'master' into pyspark-inputformats
Conflicts:
core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
commit f2d76a0c18fda6204e511d8b673b5adc749e67f1
Author: Nick Pentreath <[email protected]>
Date: 2014-03-19T08:10:26Z
Merge branch 'master' into pyspark-inputformats
Conflicts:
project/SparkBuild.scala
commit e67212a6e3fdf13930fbb6e2b8c328eb371c237b
Author: Nick Pentreath <[email protected]>
Date: 2014-03-19T08:11:01Z
Add back msgpack dependency
commit dd579220dc2d41b546f3a82f8bd5d1b82a2a19cf
Author: Nick Pentreath <[email protected]>
Date: 2014-04-10T06:47:20Z
Merge remote-tracking branch 'upstream/master' into pyspark-inputformats
Conflicts:
core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
project/SparkBuild.scala
python/pyspark/context.py
python/pyspark/serializers.py
commit d72bf18c92589f22335f1119e702d88f263a3145
Author: Nick Pentreath <[email protected]>
Date: 2014-04-10T07:45:37Z
msgpack
commit 0c612e5c56a1a9ff33535be69242b6cd42774c70
Author: Nick Pentreath <[email protected]>
Date: 2014-04-12T13:32:52Z
Merge branch 'master' into pyspark-inputformats
commit 65360d59247808db0c7db260c34b477e3cfb6b56
Author: Nick Pentreath <[email protected]>
Date: 2014-04-18T13:41:36Z
Adding test SequenceFiles
commit 25da1ca8dd2cf8117342c322ee86f9afd2e52f29
Author: Nick Pentreath <[email protected]>
Date: 2014-04-18T13:42:39Z
Add generator for nulls, bools, bytes and maps
commit 7237263b3f23591c1969df475641aac65bb9ecfe
Author: Nick Pentreath <[email protected]>
Date: 2014-04-18T13:44:08Z
Add back msgpack serializer and hadoop file code lost during merging
commit a67dfadc02bd0fde78a3de76d03a5e26a870dd44
Author: Nick Pentreath <[email protected]>
Date: 2014-04-18T13:44:46Z
Clean up Msgpack serialization and registering
commit 1bbbfb07c3a3da2c06a0f491eddb864eb9dc79f0
Author: Nick Pentreath <[email protected]>
Date: 2014-04-18T13:45:22Z
Clean up SparkBuild from merge
commit 9d2256e06e4e63eed47791f049f4702b8c55773d
Author: Nick Pentreath <[email protected]>
Date: 2014-04-18T14:18:38Z
Merge branch 'master' into pyspark-inputformats
Conflicts:
core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
project/SparkBuild.scala
commit f6aac55ef7a78569e79e9ee5b5582ee1f2ce56d5
Author: Nick Pentreath <[email protected]>
Date: 2014-04-18T14:19:17Z
Bring back msgpack
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---