Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21305#discussion_r190696654
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds:
Dataset[T]) {
val cls = DataSource.lookupDataSource(source,
df.sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
- val ds = cls.newInstance()
- ds match {
+ val source = cls.newInstance().asInstanceOf[DataSourceV2]
+ source match {
case ws: WriteSupport =>
- val options = new DataSourceOptions((extraOptions ++
- DataSourceV2Utils.extractSessionConfigs(
- ds = ds.asInstanceOf[DataSourceV2],
- conf = df.sparkSession.sessionState.conf)).asJava)
- // Using a timestamp and a random UUID to distinguish different
writing jobs. This is good
- // enough as there won't be tons of writing jobs created at the
same second.
- val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- .format(new Date()) + "-" + UUID.randomUUID()
- val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode,
options)
- if (writer.isPresent) {
+ val options = extraOptions ++
+ DataSourceV2Utils.extractSessionConfigs(source,
df.sparkSession.sessionState.conf)
+
+ val relation = DataSourceV2Relation.create(source, options.toMap)
+ if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
- WriteToDataSourceV2(writer.get(), df.logicalPlan)
+ AppendData.byName(relation, df.logicalPlan)
+ }
+
+ } else {
+ val writer = ws.createWriter(
+ UUID.randomUUID.toString,
df.logicalPlan.output.toStructType, mode,
--- End diff --
nit: probably put the timestamp back to minimize the change
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]