[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

2023-09-30 Thread ZygD (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770658#comment-17770658
 ] 

ZygD commented on SPARK-23074:
--

The problem is not solved! This was incorrectly closed. [The linked 
issue|https://issues.apache.org/jira/browse/SPARK-24042] is about arrays, and 
this is not. 

> Dataframe-ified zipwithindex
> 
>
> Key: SPARK-23074
> URL: https://issues.apache.org/jira/browse/SPARK-23074
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ruslan Dautkhanov
>Priority: Minor
>  Labels: bulk-closed, dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
> df.rdd.zipWithIndex.map(ln =>
>   Row.fromSeq(
> (if (inFront) Seq(ln._2 + offset) else Seq())
>   ++ ln._1.toSeq ++
> (if (inFront) Seq() else Seq(ln._2 + offset))
>   )
> ),
> StructType(
>   (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
> ++ df.schema.fields ++ 
>   (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
> )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

2018-05-01 Thread Ruslan Dautkhanov (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460137#comment-16460137
 ] 

Ruslan Dautkhanov commented on SPARK-23074:
---

That's . When we have a "record" that spans multiple text lines, and it happens 
that some lines are in one partitions, and rest of lines are in another 
partition.. what would monotonically_increasing_id() return? It wouldn't be 
consequential, right? 

See [https://stackoverflow.com/a/48454000/470583] - people are creating quite 
expensive workaround 

{code:scala}
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") 
= {
val dfWithPartitionId = df.withColumn("partition_id", 
spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())

val partitionOffsets = dfWithPartitionId
.groupBy("partition_id")
.agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
.orderBy("partition_id")
.select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - 
col("inc_id") + lit(offset) as "cnt" )
.collect()
.map(_.getLong(0))
.toArray

 dfWithPartitionId
.withColumn("partition_offset", udf((partitionId: Int) => 
partitionOffsets(partitionId), LongType)(col("partition_id")))
.withColumn(indexName, col("partition_offset") + col("inc_id"))
.drop("partition_id", "partition_offset", "inc_id")
}
{code}

Do you see an easier way to do this? Thanks!
 

> Dataframe-ified zipwithindex
> 
>
> Key: SPARK-23074
> URL: https://issues.apache.org/jira/browse/SPARK-23074
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ruslan Dautkhanov
>Priority: Minor
>  Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
> df.rdd.zipWithIndex.map(ln =>
>   Row.fromSeq(
> (if (inFront) Seq(ln._2 + offset) else Seq())
>   ++ ln._1.toSeq ++
> (if (inFront) Seq() else Seq(ln._2 + offset))
>   )
> ),
> StructType(
>   (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
> ++ df.schema.fields ++ 
>   (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
> )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

2018-05-01 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460116#comment-16460116
 ] 

Reynold Xin commented on SPARK-23074:
-

It should give you the same ordering - that's the "monotonically increasing" 
part.

> Dataframe-ified zipwithindex
> 
>
> Key: SPARK-23074
> URL: https://issues.apache.org/jira/browse/SPARK-23074
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ruslan Dautkhanov
>Priority: Minor
>  Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
> df.rdd.zipWithIndex.map(ln =>
>   Row.fromSeq(
> (if (inFront) Seq(ln._2 + offset) else Seq())
>   ++ ln._1.toSeq ++
> (if (inFront) Seq() else Seq(ln._2 + offset))
>   )
> ),
> StructType(
>   (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
> ++ df.schema.fields ++ 
>   (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
> )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

2018-05-01 Thread Ruslan Dautkhanov (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16460087#comment-16460087
 ] 

Ruslan Dautkhanov commented on SPARK-23074:
---

[~rxin], monotonically_increasing_id wouldn't guarantee same physical order as 
in file behind a dataframe (let's say it's a dataframe that was created with 
spark-csv)? If not, then nope, we would still need dfZipWithIndex some way or 
another for some of our use cases that require interpreting file rows content 
based on their relative position to each other - that's why we need 
zipwithindex here. Thanks.

> Dataframe-ified zipwithindex
> 
>
> Key: SPARK-23074
> URL: https://issues.apache.org/jira/browse/SPARK-23074
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ruslan Dautkhanov
>Priority: Minor
>  Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
> df.rdd.zipWithIndex.map(ln =>
>   Row.fromSeq(
> (if (inFront) Seq(ln._2 + offset) else Seq())
>   ++ ln._1.toSeq ++
> (if (inFront) Seq() else Seq(ln._2 + offset))
>   )
> ),
> StructType(
>   (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
> ++ df.schema.fields ++ 
>   (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
> )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

2018-05-01 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459979#comment-16459979
 ] 

Reynold Xin commented on SPARK-23074:
-

Can your problem be solved by monotonically_increasing_id, rather than index? 
Index is expensive.

 

> Dataframe-ified zipwithindex
> 
>
> Key: SPARK-23074
> URL: https://issues.apache.org/jira/browse/SPARK-23074
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ruslan Dautkhanov
>Priority: Minor
>  Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
> df.rdd.zipWithIndex.map(ln =>
>   Row.fromSeq(
> (if (inFront) Seq(ln._2 + offset) else Seq())
>   ++ ln._1.toSeq ++
> (if (inFront) Seq() else Seq(ln._2 + offset))
>   )
> ),
> StructType(
>   (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
> ++ df.schema.fields ++ 
>   (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
> )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

2018-01-15 Thread Ruslan Dautkhanov (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326489#comment-16326489
 ] 

Ruslan Dautkhanov commented on SPARK-23074:
---

Yep, there are use cases where ordering is provided like reading files. We run 
production jobs that require going back to rdd api just to do zipWithIndex 
which is not as straight-forward as it could be if if there would be a 
dataframe-level API for zipWithIndex().. and not as performant.

That SO answer got almost 30 upvotes in 2 years so I know we're not alone and 
it could benefit many others.

Thanks.

> Dataframe-ified zipwithindex
> 
>
> Key: SPARK-23074
> URL: https://issues.apache.org/jira/browse/SPARK-23074
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ruslan Dautkhanov
>Priority: Minor
>  Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
> df.rdd.zipWithIndex.map(ln =>
>   Row.fromSeq(
> (if (inFront) Seq(ln._2 + offset) else Seq())
>   ++ ln._1.toSeq ++
> (if (inFront) Seq() else Seq(ln._2 + offset))
>   )
> ),
> StructType(
>   (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
> ++ df.schema.fields ++ 
>   (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
> )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

2018-01-15 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326479#comment-16326479
 ] 

Sean Owen commented on SPARK-23074:
---

Hm, rowNumber requires you to sort the input? I didn't think it did, 
semantically. The numbering isn't so meaningful unless the input has a defined 
ordering, sure, but the same is true of an RDD. Unless you sort it, the 
indexing could change when it's evaluated again.

You're not really guaranteed what order you see the data, although in practice, 
like in your example, you will get data from things like files in the order you 
expect.

> Dataframe-ified zipwithindex
> 
>
> Key: SPARK-23074
> URL: https://issues.apache.org/jira/browse/SPARK-23074
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ruslan Dautkhanov
>Priority: Minor
>  Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
> df.rdd.zipWithIndex.map(ln =>
>   Row.fromSeq(
> (if (inFront) Seq(ln._2 + offset) else Seq())
>   ++ ln._1.toSeq ++
> (if (inFront) Seq() else Seq(ln._2 + offset))
>   )
> ),
> StructType(
>   (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
> ++ df.schema.fields ++ 
>   (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
> )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23074) Dataframe-ified zipwithindex

2018-01-15 Thread Ruslan Dautkhanov (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326455#comment-16326455
 ] 

Ruslan Dautkhanov commented on SPARK-23074:
---

{quote}You can create a DataFrame from the result of .zipWithIndex on an RDD, 
as you see here.{quote}

That's very kludgy as you can see the code snippet above. A direct 
functionality for this RDD-level api would be super awesome to have.

{quote}There's already a rowNumber function in Spark SQL, however, which sounds 
like the native equivalent?{quote}

That's not the same. rowNumber requires an expression to `order by` on. What if 
there is no such column? For example, we often 
have files that we ingest into Spark and where we physical position of a record 
is meaningful how that record has to be processed. 
I can give exact example if you're necessary. zipWithIndex() actually the only 
one API call that preserves such information from 
original source (even though it can be distributed into multiple partitions 
etc.).
Also as folks in that stackoverflow question said, rowNumber approach is way 
slower (and it's not surprizing as it requires data sorting).



> Dataframe-ified zipwithindex
> 
>
> Key: SPARK-23074
> URL: https://issues.apache.org/jira/browse/SPARK-23074
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Ruslan Dautkhanov
>Priority: Minor
>  Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
> df.rdd.zipWithIndex.map(ln =>
>   Row.fromSeq(
> (if (inFront) Seq(ln._2 + offset) else Seq())
>   ++ ln._1.toSeq ++
> (if (inFront) Seq() else Seq(ln._2 + offset))
>   )
> ),
> StructType(
>   (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
> ++ df.schema.fields ++ 
>   (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
> )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org