aakash-db commented on code in PR #51003: URL: https://github.com/apache/spark/pull/51003#discussion_r2114351481
########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraphTransformer.scala: ########## @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import java.util.concurrent.{ + ConcurrentHashMap, + ConcurrentLinkedDeque, + ConcurrentLinkedQueue, + ExecutionException, + Future +} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ +import scala.util.control.NoStackTrace + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.util.ThreadUtils + +/** + * Resolves the [[DataflowGraph]] by processing each node in the graph. This class exposes visitor + * functionality to resolve/analyze graph nodes. + * We only expose simple visitor abilities to transform different entities of the + * graph. + * For advanced transformations we also expose a mechanism to walk the graph over entity by entity. + * + * Assumptions: + * 1. Each output will have at-least 1 flow to it. + * 2. Each flow may or may not have a destination table. If a flow does not have a destination + * table, the destination is a view. + * + * The way graph is structured is that flows, tables and sinks all are graph elements or nodes. + * While we expose transformation functions for each of these entities, we also expose a way to + * process to walk over the graph. + * + * Constructor is private as all usages should be via + * DataflowGraphTransformer.withDataflowGraphTransformer. + * @param graph: Any Dataflow Graph + */ +class DataflowGraphTransformer(graph: DataflowGraph) extends AutoCloseable { + import DataflowGraphTransformer._ + + private var tables: Seq[Table] = graph.tables + private var tableMap: Map[TableIdentifier, Table] = computeTableMap() + private var flows: Seq[Flow] = graph.flows + private var flowsTo: Map[TableIdentifier, Seq[Flow]] = computeFlowsTo() + private var views: Seq[View] = graph.views + private var viewMap: Map[TableIdentifier, View] = computeViewMap() + + // Fail analysis nodes + // Failed flows are flows that are failed to resolve or its inputs are not available or its + // destination failed to resolve. + private var failedFlows: Seq[ResolutionCompletedFlow] = Seq.empty + // We define a dataset is failed to resolve if: + // 1. It is a destination of a flow that is unresolved. + private var failedTables: Seq[Table] = Seq.empty + + private val parallelism = 10 + + // Executor used to resolve nodes in parallel. It is lazily initialized to avoid creating it + // for scenarios its not required. To track if the lazy val was evaluated or not we use a + // separate variable so we know if we need to shutdown the executor or not. + private var fixedPoolExecutorInitialized = false + lazy private val fixedPoolExecutor = { + fixedPoolExecutorInitialized = true + ThreadUtils.newDaemonFixedThreadPool( + parallelism, + prefix = "data-flow-graph-transformer-" + ) + } + private val selfExecutor = ThreadUtils.sameThreadExecutorService() + + private def computeTableMap(): Map[TableIdentifier, Table] = synchronized { + tables.map(table => table.identifier -> table).toMap + } + + private def computeViewMap(): Map[TableIdentifier, View] = synchronized { + views.map(view => view.identifier -> view).toMap + } + + private def computeFlowsTo(): Map[TableIdentifier, Seq[Flow]] = synchronized { + flows.groupBy(_.destinationIdentifier) + } + + def transformTables(transformer: Table => Table): DataflowGraphTransformer = { + tables = tables.map(transformer) Review Comment: yeah, true. done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org