parallelize uses the default Serializer (PickleSerializer) while textFile uses UTF8Serializer.
You can get around this with index.zip(input_data._reserialize()) (or index.zip(input_data.map(lambda x: x))) (But if you try to just do this, you run into the issue with different number of partitions): index.zip(input_data._reserialize()).count() Py4JJavaError: An error occurred while calling o60.collect. : java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedRDD.getPartitions(ZippedRDD.scala:55) On Wed, Jul 30, 2014 at 7:53 AM, Davies Liu <dav...@databricks.com> wrote: > On Mon, Jul 28, 2014 at 12:58 PM, lllll <lishu...@gmail.com> wrote: > > I have a file in s3 that I want to map each line with an index. Here is > my > > code: > > > >>>> input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache() > >>>> N input_data.count() > >>>> index = sc.parallelize(range(N), 6) > >>>> index.zip(input_data).collect() > > I think you can not do zipWithIndex() in this way, because the number of > lines in each partition of input_data will be different than index. You > need > get the exact number of lines for each partitions first, then generate > correct > index. It will be easy to do with mapPartitions() > > >>> nums = input_data.mapPartitions(lambda it: [sum(1 for i in > it)]).collect() > >>> starts = [sum(nums[:i]) for i in range(len(nums))] > >>> zipped = input_data.mapPartitionsWithIndex(lambda i,it: ((starts[i]+j, > x) for j,x in enumerate(it))) > > > > > ... > > 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4) > > 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at <stdin>:1) > > finished in 0.031 s > > 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at <stdin>:1, > > took 0.039999707 s > > Traceback (most recent call last): > > File "<stdin>", line 1, in <module> > > File "/root/spark/python/pyspark/rdd.py", line 584, in collect > > return list(self._collect_iterator_through_file(bytesInJava)) > > File "/root/spark/python/pyspark/rdd.py", line 592, in > > _collect_iterator_through_file > > self.ctx._writeToFile(iterator, tempFile.name) > > File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", > > line 537, in __call__ > > File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line > > 300, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling > > z:org.apache.spark.api.python.PythonRDD.writeToFile. > > : java.lang.ClassCastException: java.lang.String cannot be cast to [B > > at > > > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312) > > at > > > org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309) > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > at > > > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309) > > at > org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342) > > at > org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337) > > at > org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala) > > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > > at > py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > > at py4j.Gateway.invoke(Gateway.java:259) > > at > py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > at py4j.commands.CallCommand.execute(CallCommand.java:79) > > at py4j.GatewayConnection.run(GatewayConnection.java:207) > > at java.lang.Thread.run(Thread.java:744) > > > As I see it, the job is completed, but I don't understand what's > happening > > to 'String cannot be cast to [B'. I tried to zip two > parallelCollectionRDD > > and it works fine. But here I have a MappedRDD at textFile. Not sure > what's > > going on here. > > Could you provide an script and dataset to reproduce this error? Maybe > there are some corner cases during serialization. > > > > Also, why Python does not have ZipWithIndex()? > > The features in PySpark are much less than Spark, hopefully it will > catch up in next two releases. > > > > > Thanks for any help. > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. >