[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex
[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770658#comment-17770658 ] ZygD commented on SPARK-23074: -- The problem is not solved! This was incorrectly closed. [The linked issue|https://issues.apache.org/jira/browse/SPARK-24042] is about arrays, and this is not. > Dataframe-ified zipwithindex > > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Ruslan Dautkhanov >Priority: Minor > Labels: bulk-closed, dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex
[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460137#comment-16460137 ] Ruslan Dautkhanov commented on SPARK-23074: --- That's . When we have a "record" that spans multiple text lines, and it happens that some lines are in one partitions, and rest of lines are in another partition.. what would monotonically_increasing_id() return? It wouldn't be consequential, right? See [https://stackoverflow.com/a/48454000/470583] - people are creating quite expensive workaround {code:scala} def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = { val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id()) val partitionOffsets = dfWithPartitionId .groupBy("partition_id") .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id") .orderBy("partition_id") .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" ) .collect() .map(_.getLong(0)) .toArray dfWithPartitionId .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id"))) .withColumn(indexName, col("partition_offset") + col("inc_id")) .drop("partition_id", "partition_offset", "inc_id") } {code} Do you see an easier way to do this? Thanks! > Dataframe-ified zipwithindex > > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Ruslan Dautkhanov >Priority: Minor > Labels: dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex
[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460116#comment-16460116 ] Reynold Xin commented on SPARK-23074: - It should give you the same ordering - that's the "monotonically increasing" part. > Dataframe-ified zipwithindex > > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Ruslan Dautkhanov >Priority: Minor > Labels: dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex
[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460087#comment-16460087 ] Ruslan Dautkhanov commented on SPARK-23074: --- [~rxin], monotonically_increasing_id wouldn't guarantee same physical order as in file behind a dataframe (let's say it's a dataframe that was created with spark-csv)? If not, then nope, we would still need dfZipWithIndex some way or another for some of our use cases that require interpreting file rows content based on their relative position to each other - that's why we need zipwithindex here. Thanks. > Dataframe-ified zipwithindex > > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Ruslan Dautkhanov >Priority: Minor > Labels: dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex
[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459979#comment-16459979 ] Reynold Xin commented on SPARK-23074: - Can your problem be solved by monotonically_increasing_id, rather than index? Index is expensive. > Dataframe-ified zipwithindex > > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Ruslan Dautkhanov >Priority: Minor > Labels: dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex
[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326489#comment-16326489 ] Ruslan Dautkhanov commented on SPARK-23074: --- Yep, there are use cases where ordering is provided like reading files. We run production jobs that require going back to rdd api just to do zipWithIndex which is not as straight-forward as it could be if if there would be a dataframe-level API for zipWithIndex().. and not as performant. That SO answer got almost 30 upvotes in 2 years so I know we're not alone and it could benefit many others. Thanks. > Dataframe-ified zipwithindex > > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Ruslan Dautkhanov >Priority: Minor > Labels: dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex
[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326479#comment-16326479 ] Sean Owen commented on SPARK-23074: --- Hm, rowNumber requires you to sort the input? I didn't think it did, semantically. The numbering isn't so meaningful unless the input has a defined ordering, sure, but the same is true of an RDD. Unless you sort it, the indexing could change when it's evaluated again. You're not really guaranteed what order you see the data, although in practice, like in your example, you will get data from things like files in the order you expect. > Dataframe-ified zipwithindex > > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Ruslan Dautkhanov >Priority: Minor > Labels: dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex
[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326455#comment-16326455 ] Ruslan Dautkhanov commented on SPARK-23074: --- {quote}You can create a DataFrame from the result of .zipWithIndex on an RDD, as you see here.{quote} That's very kludgy as you can see the code snippet above. A direct functionality for this RDD-level api would be super awesome to have. {quote}There's already a rowNumber function in Spark SQL, however, which sounds like the native equivalent?{quote} That's not the same. rowNumber requires an expression to `order by` on. What if there is no such column? For example, we often have files that we ingest into Spark and where we physical position of a record is meaningful how that record has to be processed. I can give exact example if you're necessary. zipWithIndex() actually the only one API call that preserves such information from original source (even though it can be distributed into multiple partitions etc.). Also as folks in that stackoverflow question said, rowNumber approach is way slower (and it's not surprizing as it requires data sorting). > Dataframe-ified zipwithindex > > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Ruslan Dautkhanov >Priority: Minor > Labels: dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org