JiaqiWang18 commented on code in PR #51507:
URL: https://github.com/apache/spark/pull/51507#discussion_r2213864934
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -224,6 +225,64 @@ private[connect] object PipelinesHandler extends Logging {
sessionHolder: SessionHolder): Unit = {
val dataflowGraphId = cmd.getDataflowGraphId
val graphElementRegistry =
DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
+
Review Comment:
extracted a
[createTableFilters](https://github.com/apache/spark/pull/51507/commits/1693ac546225c8a6be1d96eb5e64fcf03f77a344#diff-44e47ef13083c7fae6bd89d2774a8141eec6e76874424aaf4d9dc93f59362210R315)
function
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala:
##########
@@ -224,6 +225,64 @@ private[connect] object PipelinesHandler extends Logging {
sessionHolder: SessionHolder): Unit = {
val dataflowGraphId = cmd.getDataflowGraphId
val graphElementRegistry =
DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
+
+ // Extract refresh parameters from protobuf command
+ val fullRefreshTables = cmd.getFullRefreshList.asScala.toSeq
+ val fullRefreshAll = cmd.getFullRefreshAll
+ val refreshTables = cmd.getRefreshList.asScala.toSeq
+
+ // Convert table names to fully qualified TableIdentifier objects
+ def parseTableNames(tableNames: Seq[String]): Set[TableIdentifier] = {
+ tableNames.map { name =>
+ GraphIdentifierManager
+ .parseAndQualifyTableIdentifier(
+ rawTableIdentifier =
+ GraphIdentifierManager.parseTableIdentifier(name,
sessionHolder.session),
+ currentCatalog = Some(graphElementRegistry.defaultCatalog),
+ currentDatabase = Some(graphElementRegistry.defaultDatabase))
+ .identifier
+ }.toSet
+ }
+
+ if (fullRefreshTables.nonEmpty && fullRefreshAll) {
+ throw new IllegalArgumentException(
+ "Cannot specify a subset to refresh when full refresh all is set to
true.")
+ }
+
+ if (refreshTables.nonEmpty && fullRefreshAll) {
+ throw new IllegalArgumentException(
+ "Cannot specify a subset to full refresh when full refresh all is set
to true.")
+ }
+ val refreshTableNames = parseTableNames(refreshTables)
+ val fullRefreshTableNames = parseTableNames(fullRefreshTables)
+
+ if (refreshTables.nonEmpty && fullRefreshTables.nonEmpty) {
+ // check if there is an intersection between the subset
+ val intersection = refreshTableNames.intersect(fullRefreshTableNames)
+ if (intersection.nonEmpty) {
+ throw new IllegalArgumentException(
+ "Datasets specified for refresh and full refresh cannot overlap: " +
+ s"${intersection.mkString(", ")}")
+ }
+ }
+
+ val fullRefreshTablesFilter: TableFilter = if (fullRefreshAll) {
+ AllTables
+ } else if (fullRefreshTables.nonEmpty) {
+ SomeTables(fullRefreshTableNames)
+ } else {
+ NoTables
+ }
+
+ val refreshTablesFilter: TableFilter =
+ if (refreshTables.nonEmpty) {
+ SomeTables(refreshTableNames)
+ } else if (fullRefreshTablesFilter != NoTables) {
+ NoTables
+ } else {
+ AllTables
+ }
Review Comment:
extracted a
[createTableFilters](https://github.com/apache/spark/pull/51507/commits/1693ac546225c8a6be1d96eb5e64fcf03f77a344#diff-44e47ef13083c7fae6bd89d2774a8141eec6e76874424aaf4d9dc93f59362210R315)
function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]