Following up on this older thread. Looking a the implementation of DataFrameWriter, it doesn't seem possible to use the schema from the dataframe itself when writing out a V2 interface?
In order to pass the dataframe schema to a datasourceV2 implementation, a custom write DataSource needs to extend FileDataSourceV2. However, in lookupV2Provider(), if the Datasource is FileDataSourceV2, isDefined will be be None, so isDefiend is always false. Snippet of DataFrameWriter:save(): val maybeV2Provider = lookupV2Provider() if (maybeV2Provider.isDefined) { val provider = maybeV2Provider.get val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, df.sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions val dsOptions = new CaseInsensitiveStringMap(options.asJava) def getTable: Table = { // For file source, it's expensive to infer schema/partition at each write. Here we pass // the schema of input query and the user-specified partitioning to `getTable`. If the // query schema is not compatible with the existing data, the write can still success but // following reads would fail. if (provider.isInstanceOf[FileDataSourceV2]) { provider.getTable( df.schema.asNullable, partitioningAsV2.toArray, dsOptions.asCaseSensitiveMap()) } else { DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema = None) } } Richard Xin-2 wrote > Saw > > > Sent from Yahoo Mail for iPhone > > > On Wednesday, July 8, 2020, 9:26 PM, Sricheta Ruj < > Sricheta.Ruj@.com > > wrote: > > > Hello Spark Team > > > > I am trying to use the DataSourceV2 API from Spark 3.0. I wanted to ask in > case of write- how do I get the user specified schema? > > > > This is what I am trying to achieve- > > > > valdata =Seq( > Row("one",1,true,12.34,6L, date, Decimal(999.00), timestamp,2f, > byteVal, shortVal), > Row("two",1,false,13.34,7L, date, Decimal(3.3), timestamp,3.59f, > byteVal, shortVal) > ) > > val schema = new StructType() > .add(StructField("name", StringType,true)) > .add(StructField("id", IntegerType,true)) > .add(StructField("flag", BooleanType,true)) > .add(StructField("salary", DoubleType,true)) > .add(StructField("phone", LongType,true)) > .add(StructField("dob", DateType,true)) > .add(StructField("weight", > DecimalType(Constants.DECIMAL_PRECISION,7),true)) > .add(StructField("time", TimestampType,true)) > .add(StructField("float", FloatType,true)) > .add(StructField("byte", ByteType,true)) > .add(StructField("short", ShortType,true)) > > > val df = spark.createDataFrame(spark.sparkContext.parallelize(data), > schema) > > //Create a new manifest and add the entity to it > df.write.format("com.microsoft.cdm") > .option("storage",storageAccountName) > .option("container",outputContainer) > .option("manifest","/root/default.manifest.cdm.json") > .option("entity","TestEntity") > .option("format","parquet") > .option("compression","gzip") > > .option("appId",appid).option("appKey",appkey).option("tenantId",tenantid) > .mode(SaveMode.Append) > .save() > > > > I have my custom DataSource Implementation as follows – > > > class DefaultSource extends DataSourceRegister with TableProvider { > > override def shortName(): String = "spark-cdm" > > override def inferSchema(options: CaseInsensitiveStringMap): StructType > = null > > override def inferPartitioning(options: CaseInsensitiveStringMap): > Array[Transform] = { > getTable(null, null, options).partitioning > } > > override def supportsExternalMetadata = true > > override def getTable(schema: StructType, partitioning: > Array[Transform], properties: util.Map[String, String]): Table = { > println(schema) > new MysqlTable(schema, properties) > } > } > > > I get null here. I am not sure how should I get the StructType I created > on df.write.. Any help would be appreciated. > > > > Thank and Regards, > > Sricheta Ruj. > > > > > > Thanks, > Sricheta Ruj -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org