Hi there, If it is OK with you to work with DataFrames, you can do
https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField,StructType,IntegerType, LongType} val df = sc.parallelize(Seq( (1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y") // Append "rowid" column of type Long val schema = df.schema val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false))) // Zip on RDD level val rddWithId = df.rdd.zipWithIndex // Convert back to DataFrame val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) // Show results dfZippedWithId.show Best, Anastasios On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏 <blue...@cnic.cn> wrote: > Hello,Since Dataset has no zip(..) methods, so I wrote following code to > zip two datasets: > > 1 def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: > Dataset[X], n: Dataset[Y]) = { > 2 val rdd = m.rdd.zip(n.rdd); > 3 import spark.implicits._ > 4 spark.createDataset(rdd); > 5 } > > However, in the m.rdd.zip(…) call, compile error is reported: No > ClassTag available for Y > > I know this error can be corrected when I declare Y as a ClassTag like > this: > > 1 def foo[X: Encoder, Y: ClassTag](spark: SparkSession, … > > But this will make line 5 report a new error: > Unable to find encoder for type stored in a Dataset. > > Now, I have no idea to solve this problem. How to declared Y as both an > Encoder and a ClassTag? > > Many thanks! > > Best regards, > bluejoe > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias <a...@zurich.ibm.com>