Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/11509#discussion_r55204271
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala ---
@@ -167,22 +117,63 @@ class DefaultSource extends HadoopFsRelationProvider
with DataSourceRegister {
throw new IOException(s"Illegal schema for libsvm data,
schema=${dataSchema}")
}
}
+ override def inferSchema(
+ sqlContext: SQLContext,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ Some(
+ StructType(
+ StructField("label", DoubleType, nullable = false) ::
+ StructField("features", new VectorUDT(), nullable = false) :: Nil))
+ }
- override def createRelation(
+ override def prepareWrite(
sqlContext: SQLContext,
- paths: Array[String],
- dataSchema: Option[StructType],
- partitionColumns: Option[StructType],
- parameters: Map[String, String]): HadoopFsRelation = {
- val path = if (paths.length == 1) paths(0)
- else if (paths.isEmpty) throw new IOException("No input path
specified for libsvm data")
- else throw new IOException("Multiple input paths are not supported
for libsvm data")
- if (partitionColumns.isDefined && !partitionColumns.get.isEmpty) {
- throw new IOException("Partition is not supported for libsvm data")
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ bucketId: Option[Int],
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ if (bucketId.isDefined) { sys.error("LibSVM doesn't support
bucketing") }
+ new LibSVMOutputWriter(path, dataSchema, context)
+ }
+ }
+ }
+
+ override def buildInternalScan(
+ sqlContext: SQLContext,
+ dataSchema: StructType,
+ requiredColumns: Array[String],
+ filters: Array[Filter],
+ bucketSet: Option[BitSet],
+ inputFiles: Array[FileStatus],
+ broadcastedConf: Broadcast[SerializableConfiguration],
+ options: Map[String, String]): RDD[InternalRow] = {
+ // TODO: This does not handle cases where column pruning has been
performed.
+
+ verifySchema(dataSchema)
--- End diff --
should we verify schema earlier? i.e. in `prepareWrite`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]