HyukjinKwon commented on code in PR #43784:
URL: https://github.com/apache/spark/pull/43784#discussion_r1391970013
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala:
##########
@@ -75,3 +87,106 @@ class DataSourceManager {
dataSourceBuilders.containsKey(normalize(name))
}
}
+
+/**
+ * Data Source V1 default source wrapper for Python Data Source.
+ */
+abstract class PythonDefaultSource
+ extends RelationProvider
+ with SchemaRelationProvider
+ with DataSourceRegister {
+
+ override def createRelation(
+ sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation =
+ new PythonRelation(shortName(), sqlContext, parameters, None)
+
+ override def createRelation(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ schema: StructType): BaseRelation =
+ new PythonRelation(shortName(), sqlContext, parameters, Some(schema))
+}
+
+/**
+ * Data Source V1 relation wrapper for Python Data Source.
+ */
+class PythonRelation(
+ source: String,
+ override val sqlContext: SQLContext,
+ parameters: Map[String, String],
+ maybeSchema: Option[StructType]) extends BaseRelation with TableScan {
+
+ private lazy val sourceDf: DataFrame = {
+ val caseInsensitiveMap = CaseInsensitiveMap(parameters)
+ // TODO(SPARK-45600): should be session-based.
+ val builder =
sqlContext.sparkSession.sharedState.dataSourceManager.lookupDataSource(source)
+ val plan = builder(
+ sqlContext.sparkSession, source, caseInsensitiveMap.get("path").toSeq,
+ maybeSchema, caseInsensitiveMap)
+ Dataset.ofRows(sqlContext.sparkSession, plan)
+ }
+
+ override def schema: StructType = sourceDf.schema
+
+ override def buildScan(): RDD[Row] = sourceDf.rdd
+}
+
+/**
+ * This object is responsible for generating a class for Python Data Source
+ * that inherits Scala Data Source interface so other features work together
+ * with Python Data Source.
+ */
+object PythonDataSourceCodeGenerator extends Logging {
+ /**
+ * When you invoke `generateClass`, it generates a class that inherits
[[PythonDefaultSource]]
+ * that has a different short name. The generated class name as follows:
+ * "org.apache.spark.sql.execution.datasources.$shortName.DefaultSource".
+ *
+ * The `shortName` should be registered via `spark.dataSource.register`
first, then
+ * this method can corresponding Scala Data Source wrapper for the Python
Data Source.
+ *
+ * @param shortName The short name registered for Python Data Source.
+ * @return
+ */
+ def generateClass(shortName: String): Class[_] = {
+ val ctx = newCodeGenContext()
+
+ val codeBody = s"""
+ @Override
+ public String shortName() {
+ return "${StringEscapeUtils.escapeJava(shortName)}";
+ }"""
+
+ val evaluator = new ClassBodyEvaluator()
+ val parentClassLoader = new
ParentClassLoader(Utils.getContextOrSparkClassLoader)
+ evaluator.setParentClassLoader(parentClassLoader)
+ evaluator.setClassName(
+
s"org.apache.spark.sql.execution.python.datasources.$shortName.DefaultSource")
Review Comment:
@rednaxelafx I can test it right away but just in case you know direct
answer. Does this overwrite the class when this is called with the name class
name?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]