Hi there,

If it is OK with you to work with DataFrames, you can do

https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType,
LongType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
// Append "rowid" column of type Long
val schema = df.schema
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid",
LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index)
=> Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
// Show results
dfZippedWithId.show

Best,
Anastasios



On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏 <blue...@cnic.cn> wrote:

> Hello,Since Dataset has no zip(..) methods, so I wrote following code to
> zip two datasets:
>
> 1       def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m:
> Dataset[X], n: Dataset[Y]) = {
> 2               val rdd = m.rdd.zip(n.rdd);
> 3               import spark.implicits._
> 4               spark.createDataset(rdd);
> 5       }
>
> However, in the m.rdd.zip(…) call, compile error is reported:   No
> ClassTag available for Y
>
> I know this error can be corrected when I declare Y as a ClassTag like
> this:
>
> 1       def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …
>
> But this will make line 5 report a new error:
>         Unable to find encoder for type stored in a Dataset.
>
> Now, I have no idea to solve this problem. How to declared Y as both an
> Encoder and a ClassTag?
>
> Many thanks!
>
> Best regards,
> bluejoe
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>

Reply via email to