rdblue commented on a change in pull request #25115: [SPARK-28351][SQL] Support
DELETE in DataSource V2
URL: https://github.com/apache/spark/pull/25115#discussion_r308333464
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
##########
@@ -252,6 +255,45 @@ class InMemoryTable(
withData(messages.map(_.asInstanceOf[BufferedRows]))
}
}
+
+ override def deleteWhere(filters: Array[Filter]): Unit =
dataMap.synchronized {
+ val filtered = data.map {
+ rows =>
+ val newRows = filter(rows.rows, filters)
+ val newBufferedRows = new BufferedRows()
+ newBufferedRows.rows.appendAll(newRows)
+ newBufferedRows
+ }.filter(_.rows.nonEmpty)
+ dataMap.clear()
+ withData(filtered)
+ }
+
+ def filter(rows: mutable.ArrayBuffer[InternalRow],
+ filters: Array[Filter]): Array[InternalRow] = {
+ if (rows.isEmpty) {
+ rows.toArray
+ }
+ val filterStr =
+ filters.map {
+ filter => filter.sql
Review comment:
I think it is over-complicated to add a conversion from Filter to a SQL
string just so this can parse that filter back into an Expression. I'd prefer a
conversion back from Filter to Expression, but I don't think either one is
needed.
The overwrite support can run equality filters, which is enough for matching
partition keys. I recommend using that and supporting only partition-level
deletes in test tables. That way, the table also rejects some delete
expressions that are not on partition columns and we can add tests that
validate Spark's behavior for those cases.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]